mz_storage_client/
controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the storage layer.
11//!
12//! The storage controller curates the creation of sources, the progress of readers through these collections,
13//! and their eventual dropping and resource reclamation.
14//!
15//! The storage controller can be viewed as a partial map from `GlobalId` to collection. It is an error to
16//! use an identifier before it has been "created" with `create_source()`. Once created, the controller holds
17//! a read capability for each source, which is manipulated with `update_read_capabilities()`.
18//! Eventually, the source is dropped with either `drop_sources()` or by allowing compaction to the
19//! empty frontier.
20
21use std::collections::{BTreeMap, BTreeSet};
22use std::fmt::Debug;
23use std::future::Future;
24use std::num::NonZeroI64;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::time::Duration;
28
29use async_trait::async_trait;
30use chrono::{DateTime, Utc};
31use differential_dataflow::lattice::Lattice;
32use mz_cluster_client::ReplicaId;
33use mz_cluster_client::client::ClusterReplicaLocation;
34use mz_controller_types::dyncfgs::WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL;
35use mz_dyncfg::ConfigSet;
36use mz_ore::soft_panic_or_log;
37use mz_persist_client::batch::ProtoBatch;
38use mz_persist_types::{Codec64, Opaque, ShardId};
39use mz_repr::adt::interval::Interval;
40use mz_repr::adt::timestamp::CheckedTimestamp;
41use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row};
42use mz_storage_types::configuration::StorageConfiguration;
43use mz_storage_types::connections::inline::InlinedConnection;
44use mz_storage_types::controller::{CollectionMetadata, StorageError};
45use mz_storage_types::errors::CollectionMissing;
46use mz_storage_types::instances::StorageInstanceId;
47use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
48use mz_storage_types::parameters::StorageParameters;
49use mz_storage_types::read_holds::ReadHold;
50use mz_storage_types::read_policy::ReadPolicy;
51use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
52use mz_storage_types::sources::{
53    GenericSourceConnection, IngestionDescription, SourceDesc, SourceExportDataConfig,
54    SourceExportDetails, Timeline,
55};
56use serde::{Deserialize, Serialize};
57use timely::progress::Timestamp as TimelyTimestamp;
58use timely::progress::frontier::MutableAntichain;
59use timely::progress::{Antichain, Timestamp};
60use tokio::sync::{mpsc, oneshot};
61
62use crate::client::{AppendOnlyUpdate, StatusUpdate, TableData};
63use crate::statistics::WebhookStatistics;
64
65#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, Hash, PartialOrd, Ord)]
66pub enum IntrospectionType {
67    /// We're not responsible for appending to this collection automatically, but we should
68    /// automatically bump the write frontier from time to time.
69    SinkStatusHistory,
70    SourceStatusHistory,
71    ShardMapping,
72
73    Frontiers,
74    ReplicaFrontiers,
75
76    ReplicaStatusHistory,
77    ReplicaMetricsHistory,
78    WallclockLagHistory,
79    WallclockLagHistogram,
80
81    // Note that this single-shard introspection source will be changed to per-replica,
82    // once we allow multiplexing multiple sources/sinks on a single cluster.
83    StorageSourceStatistics,
84    StorageSinkStatistics,
85
86    // The below are for statement logging.
87    StatementExecutionHistory,
88    SessionHistory,
89    PreparedStatementHistory,
90    SqlText,
91    // For statement lifecycle logging, which is closely related
92    // to statement logging
93    StatementLifecycleHistory,
94
95    // Collections written by the compute controller.
96    ComputeDependencies,
97    ComputeOperatorHydrationStatus,
98    ComputeMaterializedViewRefreshes,
99    ComputeErrorCounts,
100    ComputeHydrationTimes,
101
102    // Written by the Adapter for tracking AWS PrivateLink Connection Status History
103    PrivatelinkConnectionStatusHistory,
104}
105
106/// Describes how data is written to the collection.
107#[derive(Clone, Debug, Eq, PartialEq)]
108pub enum DataSource<T> {
109    /// Ingest data from some external source.
110    Ingestion(IngestionDescription),
111    /// This source receives its data from the identified ingestion,
112    /// from an external object identified using `SourceExportDetails`.
113    ///
114    /// The referenced ingestion must be created before all of its exports.
115    IngestionExport {
116        ingestion_id: GlobalId,
117        details: SourceExportDetails,
118        data_config: SourceExportDataConfig,
119    },
120    /// Data comes from introspection sources, which the controller itself is
121    /// responsible for generating.
122    Introspection(IntrospectionType),
123    /// Data comes from the source's remapping/reclock operator.
124    Progress,
125    /// Data comes from external HTTP requests pushed to Materialize.
126    Webhook,
127    /// The adapter layer appends timestamped data, i.e. it is a `TABLE`.
128    Table,
129    /// This source's data does not need to be managed by the storage
130    /// controller, e.g. it's a materialized view or the catalog collection.
131    Other,
132    /// This collection is the output collection of a sink.
133    Sink { desc: ExportDescription<T> },
134}
135
136/// Describes a request to create a source.
137#[derive(Clone, Debug, Eq, PartialEq)]
138pub struct CollectionDescription<T> {
139    /// The schema of this collection
140    pub desc: RelationDesc,
141    /// The source of this collection's data.
142    pub data_source: DataSource<T>,
143    /// An optional frontier to which the collection's `since` should be advanced.
144    pub since: Option<Antichain<T>>,
145    /// The timeline of the source. Absent for materialized views, continual tasks, etc.
146    pub timeline: Option<Timeline>,
147    /// The primary of this collections.
148    ///
149    /// Multiple storage collections can point to the same persist shard,
150    /// possibly with different schemas. In such a configuration, we select one
151    /// of the involved collections as the primary, who "owns" the persist
152    /// shard. All other involved collections have a dependency on the primary.
153    pub primary: Option<GlobalId>,
154}
155
156impl<T> CollectionDescription<T> {
157    /// Create a CollectionDescription for [`DataSource::Other`].
158    pub fn for_other(desc: RelationDesc, since: Option<Antichain<T>>) -> Self {
159        Self {
160            desc,
161            data_source: DataSource::Other,
162            since,
163            timeline: None,
164            primary: None,
165        }
166    }
167
168    /// Create a CollectionDescription for a table.
169    pub fn for_table(desc: RelationDesc) -> Self {
170        Self {
171            desc,
172            data_source: DataSource::Table,
173            since: None,
174            timeline: Some(Timeline::EpochMilliseconds),
175            primary: None,
176        }
177    }
178}
179
180#[derive(Clone, Debug, Eq, PartialEq)]
181pub struct ExportDescription<T = mz_repr::Timestamp> {
182    pub sink: StorageSinkDesc<(), T>,
183    /// The ID of the instance in which to install the export.
184    pub instance_id: StorageInstanceId,
185}
186
187#[derive(Debug)]
188pub enum Response<T> {
189    FrontierUpdates(Vec<(GlobalId, Antichain<T>)>),
190}
191
192/// Metadata that the storage controller must know to properly handle the life
193/// cycle of creating and dropping collections.
194///
195/// This data should be kept consistent with the state modified using
196/// [`StorageTxn`].
197///
198/// n.b. the "txn WAL shard" is also metadata that's persisted, but if we
199/// included it in this struct it would never be read.
200#[derive(Debug, Clone, Serialize, Default)]
201pub struct StorageMetadata {
202    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
203    pub collection_metadata: BTreeMap<GlobalId, ShardId>,
204    pub unfinalized_shards: BTreeSet<ShardId>,
205}
206
207impl StorageMetadata {
208    pub fn get_collection_shard<T>(&self, id: GlobalId) -> Result<ShardId, StorageError<T>> {
209        let shard_id = self
210            .collection_metadata
211            .get(&id)
212            .ok_or(StorageError::IdentifierMissing(id))?;
213
214        Ok(*shard_id)
215    }
216}
217
218/// Provides an interface for the storage controller to read and write data that
219/// is recorded elsewhere.
220///
221/// Data written to the implementor of this trait should make a consistent view
222/// of the data available through [`StorageMetadata`].
223#[async_trait]
224pub trait StorageTxn<T> {
225    /// Retrieve all of the visible storage metadata.
226    ///
227    /// The value of this map should be treated as opaque.
228    fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId>;
229
230    /// Add new storage metadata for a collection.
231    ///
232    /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must include
233    /// this data.
234    fn insert_collection_metadata(
235        &mut self,
236        s: BTreeMap<GlobalId, ShardId>,
237    ) -> Result<(), StorageError<T>>;
238
239    /// Remove the metadata associated with the identified collections.
240    ///
241    /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must not
242    /// include these keys.
243    fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)>;
244
245    /// Retrieve all of the shards that are no longer in use by an active
246    /// collection but are yet to be finalized.
247    fn get_unfinalized_shards(&self) -> BTreeSet<ShardId>;
248
249    /// Insert the specified values as unfinalized shards.
250    fn insert_unfinalized_shards(&mut self, s: BTreeSet<ShardId>) -> Result<(), StorageError<T>>;
251
252    /// Mark the specified shards as finalized, deleting them from the
253    /// unfinalized shard collection.
254    fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>);
255
256    /// Get the txn WAL shard for this environment if it exists.
257    fn get_txn_wal_shard(&self) -> Option<ShardId>;
258
259    /// Store the specified shard as the environment's txn WAL shard.
260    ///
261    /// The implementor should error if the shard is already specified.
262    fn write_txn_wal_shard(&mut self, shard: ShardId) -> Result<(), StorageError<T>>;
263}
264
265pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
266
267/// A predicate for a `Row` filter.
268pub type RowPredicate = Box<dyn Fn(&Row) -> bool + Send + Sync>;
269
270/// High-level write operations applicable to storage collections.
271pub enum StorageWriteOp {
272    /// Append a set of rows with specified multiplicities.
273    ///
274    /// The multiplicities may be negative, so an `Append` operation can perform
275    /// both insertions and retractions.
276    Append { updates: Vec<(Row, Diff)> },
277    /// Delete all rows matching the given predicate.
278    Delete { filter: RowPredicate },
279}
280
281impl StorageWriteOp {
282    /// Returns whether this operation appends an empty set of updates.
283    pub fn is_empty_append(&self) -> bool {
284        match self {
285            Self::Append { updates } => updates.is_empty(),
286            Self::Delete { .. } => false,
287        }
288    }
289}
290
291#[async_trait(?Send)]
292pub trait StorageController: Debug {
293    type Timestamp: TimelyTimestamp;
294
295    /// Marks the end of any initialization commands.
296    ///
297    /// The implementor may wait for this method to be called before implementing prior commands,
298    /// and so it is important for a user to invoke this method as soon as it is comfortable.
299    /// This method can be invoked immediately, at the potential expense of performance.
300    fn initialization_complete(&mut self);
301
302    /// Update storage configuration with new parameters.
303    fn update_parameters(&mut self, config_params: StorageParameters);
304
305    /// Get the current configuration, including parameters updated with `update_parameters`.
306    fn config(&self) -> &StorageConfiguration;
307
308    /// Returns the [CollectionMetadata] of the collection identified by `id`.
309    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
310
311    /// Returns `true` iff the given collection/ingestion has been hydrated.
312    ///
313    /// For this check, zero-replica clusters are always considered hydrated.
314    /// Their collections would never normally be considered hydrated but it's
315    /// clearly intentional that they have no replicas.
316    fn collection_hydrated(
317        &self,
318        collection_id: GlobalId,
319    ) -> Result<bool, StorageError<Self::Timestamp>>;
320
321    /// Returns `true` if each non-transient, non-excluded collection is
322    /// hydrated on at least one of the provided replicas.
323    ///
324    /// If no replicas are provided, this checks for hydration on _any_ replica.
325    ///
326    /// This also returns `true` in case this cluster does not have any
327    /// replicas.
328    fn collections_hydrated_on_replicas(
329        &self,
330        target_replica_ids: Option<Vec<ReplicaId>>,
331        target_cluster_ids: &StorageInstanceId,
332        exclude_collections: &BTreeSet<GlobalId>,
333    ) -> Result<bool, StorageError<Self::Timestamp>>;
334
335    /// Returns the since/upper frontiers of the identified collection.
336    fn collection_frontiers(
337        &self,
338        id: GlobalId,
339    ) -> Result<(Antichain<Self::Timestamp>, Antichain<Self::Timestamp>), CollectionMissing>;
340
341    /// Returns the since/upper frontiers of the identified collections.
342    ///
343    /// Having a method that returns both frontiers at the same time, for all
344    /// requested collections, ensures that we can get a consistent "snapshot"
345    /// of collection state. If we had separate methods instead, and/or would
346    /// allow getting frontiers for collections one at a time, it could happen
347    /// that collection state changes concurrently, while information is
348    /// gathered.
349    fn collections_frontiers(
350        &self,
351        id: Vec<GlobalId>,
352    ) -> Result<
353        Vec<(
354            GlobalId,
355            Antichain<Self::Timestamp>,
356            Antichain<Self::Timestamp>,
357        )>,
358        CollectionMissing,
359    >;
360
361    /// Acquire an iterator over [CollectionMetadata] for all active
362    /// collections.
363    ///
364    /// A collection is "active" when it has a non-empty frontier of read
365    /// capabilities.
366    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
367
368    /// Returns the IDs of ingestion exports running on the given instance. This
369    /// includes the ingestion itself, if any, and running source tables (aka.
370    /// subsources).
371    fn active_ingestion_exports(
372        &self,
373        instance_id: StorageInstanceId,
374    ) -> Box<dyn Iterator<Item = &GlobalId> + '_>;
375
376    /// Checks whether a collection exists under the given `GlobalId`. Returns
377    /// an error if the collection does not exist.
378    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
379
380    /// Creates a storage instance with the specified ID.
381    ///
382    /// A storage instance can have zero or one replicas. The instance is
383    /// created with zero replicas.
384    ///
385    /// Panics if a storage instance with the given ID already exists.
386    fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>);
387
388    /// Drops the storage instance with the given ID.
389    ///
390    /// If you call this method while the storage instance has a replica
391    /// attached, that replica will be leaked. Call `drop_replica` first.
392    ///
393    /// Panics if a storage instance with the given ID does not exist.
394    fn drop_instance(&mut self, id: StorageInstanceId);
395
396    /// Updates a storage instance's workload class.
397    fn update_instance_workload_class(
398        &mut self,
399        id: StorageInstanceId,
400        workload_class: Option<String>,
401    );
402
403    /// Connects the storage instance to the specified replica.
404    ///
405    /// If the storage instance is already attached to a replica, communication
406    /// with that replica is severed in favor of the new replica.
407    ///
408    /// In the future, this API will be adjusted to support active replication
409    /// of storage instances (i.e., multiple replicas attached to a given
410    /// storage instance).
411    fn connect_replica(
412        &mut self,
413        instance_id: StorageInstanceId,
414        replica_id: ReplicaId,
415        location: ClusterReplicaLocation,
416    );
417
418    /// Disconnects the storage instance from the specified replica.
419    fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId);
420
421    /// Across versions of Materialize the nullability of columns for some objects can change based
422    /// on updates to our optimizer.
423    ///
424    /// During bootstrap we will register these new schemas with Persist.
425    ///
426    /// See: <https://github.com/MaterializeInc/database-issues/issues/2488>
427    async fn evolve_nullability_for_bootstrap(
428        &mut self,
429        storage_metadata: &StorageMetadata,
430        collections: Vec<(GlobalId, RelationDesc)>,
431    ) -> Result<(), StorageError<Self::Timestamp>>;
432
433    /// Create the sources described in the individual RunIngestionCommand commands.
434    ///
435    /// Each command carries the source id, the source description, and any associated metadata
436    /// needed to ingest the particular source.
437    ///
438    /// This command installs collection state for the indicated sources, and they are
439    /// now valid to use in queries at times beyond the initial `since` frontiers. Each
440    /// collection also acquires a read capability at this frontier, which will need to
441    /// be repeatedly downgraded with `allow_compaction()` to permit compaction.
442    ///
443    /// This method is NOT idempotent; It can fail between processing of different
444    /// collections and leave the controller in an inconsistent state. It is almost
445    /// always wrong to do anything but abort the process on `Err`.
446    ///
447    /// The `register_ts` is used as the initial timestamp that tables are available for reads. (We
448    /// might later give non-tables the same treatment, but hold off on that initially.) Callers
449    /// must provide a Some if any of the collections is a table. A None may be given if none of the
450    /// collections are a table (i.e. all materialized views, sources, etc).
451    async fn create_collections(
452        &mut self,
453        storage_metadata: &StorageMetadata,
454        register_ts: Option<Self::Timestamp>,
455        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
456    ) -> Result<(), StorageError<Self::Timestamp>> {
457        self.create_collections_for_bootstrap(
458            storage_metadata,
459            register_ts,
460            collections,
461            &BTreeSet::new(),
462        )
463        .await
464    }
465
466    /// Like [`Self::create_collections`], except used specifically for bootstrap.
467    ///
468    /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
469    /// from the txn-wal sub-system.
470    async fn create_collections_for_bootstrap(
471        &mut self,
472        storage_metadata: &StorageMetadata,
473        register_ts: Option<Self::Timestamp>,
474        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
475        migrated_storage_collections: &BTreeSet<GlobalId>,
476    ) -> Result<(), StorageError<Self::Timestamp>>;
477
478    /// Check that the ingestion associated with `id` can use the provided
479    /// [`SourceDesc`].
480    ///
481    /// Note that this check is optimistic and its return of `Ok(())` does not
482    /// guarantee that subsequent calls to `alter_ingestion_source_desc` are
483    /// guaranteed to succeed.
484    fn check_alter_ingestion_source_desc(
485        &mut self,
486        ingestion_id: GlobalId,
487        source_desc: &SourceDesc,
488    ) -> Result<(), StorageError<Self::Timestamp>>;
489
490    /// Alters each identified collection to use the correlated [`GenericSourceConnection`].
491    async fn alter_ingestion_connections(
492        &mut self,
493        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
494    ) -> Result<(), StorageError<Self::Timestamp>>;
495
496    /// Alters the data config for the specified source exports of the specified ingestions.
497    async fn alter_ingestion_export_data_configs(
498        &mut self,
499        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
500    ) -> Result<(), StorageError<Self::Timestamp>>;
501
502    async fn alter_table_desc(
503        &mut self,
504        existing_collection: GlobalId,
505        new_collection: GlobalId,
506        new_desc: RelationDesc,
507        expected_version: RelationVersion,
508        register_ts: Self::Timestamp,
509    ) -> Result<(), StorageError<Self::Timestamp>>;
510
511    /// Acquire an immutable reference to the export state, should it exist.
512    fn export(
513        &self,
514        id: GlobalId,
515    ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>;
516
517    /// Acquire a mutable reference to the export state, should it exist.
518    fn export_mut(
519        &mut self,
520        id: GlobalId,
521    ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>;
522
523    /// Create a oneshot ingestion.
524    async fn create_oneshot_ingestion(
525        &mut self,
526        ingestion_id: uuid::Uuid,
527        collection_id: GlobalId,
528        instance_id: StorageInstanceId,
529        request: OneshotIngestionRequest,
530        result_tx: OneshotResultCallback<ProtoBatch>,
531    ) -> Result<(), StorageError<Self::Timestamp>>;
532
533    /// Cancel a oneshot ingestion.
534    fn cancel_oneshot_ingestion(
535        &mut self,
536        ingestion_id: uuid::Uuid,
537    ) -> Result<(), StorageError<Self::Timestamp>>;
538
539    /// Alter the sink identified by the given id to match the provided `ExportDescription`.
540    async fn alter_export(
541        &mut self,
542        id: GlobalId,
543        export: ExportDescription<Self::Timestamp>,
544    ) -> Result<(), StorageError<Self::Timestamp>>;
545
546    /// For each identified export, alter its [`StorageSinkConnection`].
547    async fn alter_export_connections(
548        &mut self,
549        exports: BTreeMap<GlobalId, StorageSinkConnection>,
550    ) -> Result<(), StorageError<Self::Timestamp>>;
551
552    /// Drops the read capability for the tables and allows their resources to be reclaimed.
553    fn drop_tables(
554        &mut self,
555        storage_metadata: &StorageMetadata,
556        identifiers: Vec<GlobalId>,
557        ts: Self::Timestamp,
558    ) -> Result<(), StorageError<Self::Timestamp>>;
559
560    /// Drops the read capability for the sources and allows their resources to be reclaimed.
561    fn drop_sources(
562        &mut self,
563        storage_metadata: &StorageMetadata,
564        identifiers: Vec<GlobalId>,
565    ) -> Result<(), StorageError<Self::Timestamp>>;
566
567    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
568    fn drop_sinks(
569        &mut self,
570        storage_metadata: &StorageMetadata,
571        identifiers: Vec<GlobalId>,
572    ) -> Result<(), StorageError<Self::Timestamp>>;
573
574    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
575    ///
576    /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
577    ///     controller starts/restarts it has no durable state. That means that it has no way of
578    ///     remembering any past commands sent. In the future we plan on persisting state for the
579    ///     controller so that it is aware of past commands.
580    ///     Therefore this method is for dropping sinks that we know to have been previously
581    ///     created, but have been forgotten by the controller due to a restart.
582    ///     Once command history becomes durable we can remove this method and use the normal
583    ///     `drop_sinks`.
584    fn drop_sinks_unvalidated(
585        &mut self,
586        storage_metadata: &StorageMetadata,
587        identifiers: Vec<GlobalId>,
588    );
589
590    /// Drops the read capability for the sources and allows their resources to be reclaimed.
591    ///
592    /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
593    ///     controller starts/restarts it has no durable state. That means that it has no way of
594    ///     remembering any past commands sent. In the future we plan on persisting state for the
595    ///     controller so that it is aware of past commands.
596    ///     Therefore this method is for dropping sources that we know to have been previously
597    ///     created, but have been forgotten by the controller due to a restart.
598    ///     Once command history becomes durable we can remove this method and use the normal
599    ///     `drop_sources`.
600    fn drop_sources_unvalidated(
601        &mut self,
602        storage_metadata: &StorageMetadata,
603        identifiers: Vec<GlobalId>,
604    ) -> Result<(), StorageError<Self::Timestamp>>;
605
606    /// Append `updates` into the local input named `id` and advance its upper to `upper`.
607    ///
608    /// The method returns a oneshot that can be awaited to indicate completion of the write.
609    /// The method may return an error, indicating an immediately visible error, and also the
610    /// oneshot may return an error if one is encountered during the write.
611    ///
612    /// All updates in `commands` are applied atomically.
613    // TODO(petrosagg): switch upper to `Antichain<Timestamp>`
614    fn append_table(
615        &mut self,
616        write_ts: Self::Timestamp,
617        advance_to: Self::Timestamp,
618        commands: Vec<(GlobalId, Vec<TableData>)>,
619    ) -> Result<
620        tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
621        StorageError<Self::Timestamp>,
622    >;
623
624    /// Returns a [`MonotonicAppender`] which is a channel that can be used to monotonically
625    /// append to the specified [`GlobalId`].
626    fn monotonic_appender(
627        &self,
628        id: GlobalId,
629    ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>>;
630
631    /// Returns a shared [`WebhookStatistics`] which can be used to report user-facing
632    /// statistics for this given webhhook, specified by the [`GlobalId`].
633    ///
634    // This is used to support a fairly special case, where a source needs to report statistics
635    // from outside the ordinary controller-clusterd path. Its possible to merge this with
636    // `monotonic_appender`, whose only current user is webhooks, but given that they will
637    // likely be moved to clusterd, we just leave this a special case.
638    fn webhook_statistics(
639        &self,
640        id: GlobalId,
641    ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>>;
642
643    /// Waits until the controller is ready to process a response.
644    ///
645    /// This method may block for an arbitrarily long time.
646    ///
647    /// When the method returns, the owner should call
648    /// [`StorageController::process`] to process the ready message.
649    ///
650    /// This method is cancellation safe.
651    async fn ready(&mut self);
652
653    /// Processes the work queued by [`StorageController::ready`].
654    fn process(
655        &mut self,
656        storage_metadata: &StorageMetadata,
657    ) -> Result<Option<Response<Self::Timestamp>>, anyhow::Error>;
658
659    /// Exposes the internal state of the data shard for debugging and QA.
660    ///
661    /// We'll be thoughtful about making unnecessary changes, but the **output
662    /// of this method needs to be gated from users**, so that it's not subject
663    /// to our backward compatibility guarantees.
664    ///
665    /// TODO: Ideally this would return `impl Serialize` so the caller can do
666    /// with it what they like, but that doesn't work in traits yet. The
667    /// workaround (an associated type) doesn't work because persist doesn't
668    /// want to make the type public. In the meantime, move the `serde_json`
669    /// call from the single user into this method.
670    async fn inspect_persist_state(&self, id: GlobalId)
671    -> Result<serde_json::Value, anyhow::Error>;
672
673    /// Records append-only updates for the given introspection type.
674    ///
675    /// Rows passed in `updates` MUST have the correct schema for the given
676    /// introspection type, as readers rely on this and might panic otherwise.
677    fn append_introspection_updates(&mut self, type_: IntrospectionType, updates: Vec<(Row, Diff)>);
678
679    /// Records append-only status updates for the given introspection type.
680    fn append_status_introspection_updates(
681        &mut self,
682        type_: IntrospectionType,
683        updates: Vec<StatusUpdate>,
684    );
685
686    /// Updates the desired state of the given introspection type.
687    ///
688    /// Rows passed in `op` MUST have the correct schema for the given
689    /// introspection type, as readers rely on this and might panic otherwise.
690    fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp);
691
692    /// Returns a sender for updates to the specified append-only introspection collection.
693    ///
694    /// # Panics
695    ///
696    /// Panics if the given introspection type is not associated with an append-only collection.
697    fn append_only_introspection_tx(
698        &self,
699        type_: IntrospectionType,
700    ) -> mpsc::UnboundedSender<(
701        Vec<AppendOnlyUpdate>,
702        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
703    )>;
704
705    /// Returns a sender for updates to the specified differential introspection collection.
706    ///
707    /// # Panics
708    ///
709    /// Panics if the given introspection type is not associated with a differential collection.
710    fn differential_introspection_tx(
711        &self,
712        type_: IntrospectionType,
713    ) -> mpsc::UnboundedSender<(
714        StorageWriteOp,
715        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
716    )>;
717
718    async fn real_time_recent_timestamp(
719        &self,
720        source_ids: BTreeSet<GlobalId>,
721        timeout: Duration,
722    ) -> Result<
723        BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
724        StorageError<Self::Timestamp>,
725    >;
726
727    /// Returns the state of the [`StorageController`] formatted as JSON.
728    fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
729}
730
731impl<T> DataSource<T> {
732    /// Returns true if the storage controller manages the data shard for this
733    /// source using txn-wal.
734    pub fn in_txns(&self) -> bool {
735        match self {
736            DataSource::Table => true,
737            DataSource::Other
738            | DataSource::Ingestion(_)
739            | DataSource::IngestionExport { .. }
740            | DataSource::Introspection(_)
741            | DataSource::Progress
742            | DataSource::Webhook => false,
743            DataSource::Sink { .. } => false,
744        }
745    }
746}
747
748/// A wrapper struct that presents the adapter token to a format that is understandable by persist
749/// and also allows us to differentiate between a token being present versus being set for the
750/// first time.
751#[derive(PartialEq, Clone, Debug)]
752pub struct PersistEpoch(pub Option<NonZeroI64>);
753
754impl Opaque for PersistEpoch {
755    fn initial() -> Self {
756        PersistEpoch(None)
757    }
758}
759
760impl Codec64 for PersistEpoch {
761    fn codec_name() -> String {
762        "PersistEpoch".to_owned()
763    }
764
765    fn encode(&self) -> [u8; 8] {
766        self.0.map(NonZeroI64::get).unwrap_or(0).to_le_bytes()
767    }
768
769    fn decode(buf: [u8; 8]) -> Self {
770        Self(NonZeroI64::new(i64::from_le_bytes(buf)))
771    }
772}
773
774impl From<NonZeroI64> for PersistEpoch {
775    fn from(epoch: NonZeroI64) -> Self {
776        Self(Some(epoch))
777    }
778}
779
780/// State maintained about individual exports.
781#[derive(Debug)]
782pub struct ExportState<T: TimelyTimestamp> {
783    /// Really only for keeping track of changes to the `derived_since`.
784    pub read_capabilities: MutableAntichain<T>,
785
786    /// The cluster this export is associated with.
787    pub cluster_id: StorageInstanceId,
788
789    /// The current since frontier, derived from `write_frontier` using
790    /// `hold_policy`.
791    pub derived_since: Antichain<T>,
792
793    /// The read holds that this export has on its dependencies (its input and itself). When
794    /// the upper of the export changes, we downgrade this, which in turn
795    /// downgrades holds we have on our dependencies' sinces.
796    pub read_holds: [ReadHold<T>; 2],
797
798    /// The policy to use to downgrade `self.read_capability`.
799    pub read_policy: ReadPolicy<T>,
800
801    /// Reported write frontier.
802    pub write_frontier: Antichain<T>,
803}
804
805impl<T: Timestamp> ExportState<T> {
806    pub fn new(
807        cluster_id: StorageInstanceId,
808        read_hold: ReadHold<T>,
809        self_hold: ReadHold<T>,
810        write_frontier: Antichain<T>,
811        read_policy: ReadPolicy<T>,
812    ) -> Self
813    where
814        T: Lattice,
815    {
816        let mut dependency_since = Antichain::from_elem(T::minimum());
817        for read_hold in [&read_hold, &self_hold] {
818            dependency_since.join_assign(read_hold.since());
819        }
820        Self {
821            read_capabilities: MutableAntichain::from(dependency_since.borrow()),
822            cluster_id,
823            derived_since: dependency_since,
824            read_holds: [read_hold, self_hold],
825            read_policy,
826            write_frontier,
827        }
828    }
829
830    /// Returns the cluster to which the export is bound.
831    pub fn cluster_id(&self) -> StorageInstanceId {
832        self.cluster_id
833    }
834
835    /// Returns the cluster to which the export is bound.
836    pub fn input_hold(&self) -> &ReadHold<T> {
837        &self.read_holds[0]
838    }
839
840    /// Returns whether the export was dropped.
841    pub fn is_dropped(&self) -> bool {
842        self.read_holds.iter().all(|h| h.since().is_empty())
843    }
844}
845/// A channel that allows you to append a set of updates to a pre-defined [`GlobalId`].
846///
847/// See `CollectionManager::monotonic_appender` to acquire a [`MonotonicAppender`].
848#[derive(Clone, Debug)]
849pub struct MonotonicAppender<T> {
850    /// Channel that sends to a [`tokio::task`] which pushes updates to Persist.
851    tx: mpsc::UnboundedSender<(
852        Vec<AppendOnlyUpdate>,
853        oneshot::Sender<Result<(), StorageError<T>>>,
854    )>,
855}
856
857impl<T> MonotonicAppender<T> {
858    pub fn new(
859        tx: mpsc::UnboundedSender<(
860            Vec<AppendOnlyUpdate>,
861            oneshot::Sender<Result<(), StorageError<T>>>,
862        )>,
863    ) -> Self {
864        MonotonicAppender { tx }
865    }
866
867    pub async fn append(&self, updates: Vec<AppendOnlyUpdate>) -> Result<(), StorageError<T>> {
868        let (tx, rx) = oneshot::channel();
869
870        // Send our update to the CollectionManager.
871        self.tx
872            .send((updates, tx))
873            .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
874
875        // Wait for a response, if we fail to receive then the CollectionManager has gone away.
876        let result = rx
877            .await
878            .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
879
880        result
881    }
882}
883
884/// A wallclock lag measurement.
885///
886/// The enum representation reflects the fact that wallclock lag is undefined for unreadable
887/// collections, i.e. collections that contain no readable times.
888#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
889pub enum WallclockLag {
890    /// Lag value in seconds, for readable collections.
891    Seconds(u64),
892    /// Undefined lag, for unreadable collections.
893    Undefined,
894}
895
896impl WallclockLag {
897    /// The smallest possible wallclock lag measurement.
898    pub const MIN: Self = Self::Seconds(0);
899
900    /// Return the maximum of two lag values.
901    ///
902    /// We treat `Undefined` lags as greater than `Seconds`, to ensure we never report low lag
903    /// values when a collection was actually unreadable for some amount of time.
904    pub fn max(self, other: Self) -> Self {
905        match (self, other) {
906            (Self::Seconds(a), Self::Seconds(b)) => Self::Seconds(a.max(b)),
907            (Self::Undefined, _) | (_, Self::Undefined) => Self::Undefined,
908        }
909    }
910
911    /// Return the wrapped seconds value, or a default if the lag is `Undefined`.
912    pub fn unwrap_seconds_or(self, default: u64) -> u64 {
913        match self {
914            Self::Seconds(s) => s,
915            Self::Undefined => default,
916        }
917    }
918
919    /// Create a new `WallclockLag` by transforming the wrapped seconds value.
920    pub fn map_seconds(self, f: impl FnOnce(u64) -> u64) -> Self {
921        match self {
922            Self::Seconds(s) => Self::Seconds(f(s)),
923            Self::Undefined => Self::Undefined,
924        }
925    }
926
927    /// Convert this lag value into a [`Datum::Interval`] or [`Datum::Null`].
928    pub fn into_interval_datum(self) -> Datum<'static> {
929        match self {
930            Self::Seconds(secs) => {
931                let micros = i64::try_from(secs * 1_000_000).expect("must fit");
932                Datum::Interval(Interval::new(0, 0, micros))
933            }
934            Self::Undefined => Datum::Null,
935        }
936    }
937
938    /// Convert this lag value into a [`Datum::UInt64`] or [`Datum::Null`].
939    pub fn into_uint64_datum(self) -> Datum<'static> {
940        match self {
941            Self::Seconds(secs) => Datum::UInt64(secs),
942            Self::Undefined => Datum::Null,
943        }
944    }
945}
946
947/// The period covered by a wallclock lag histogram, represented as a `[start, end)` range.
948#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
949pub struct WallclockLagHistogramPeriod {
950    pub start: CheckedTimestamp<DateTime<Utc>>,
951    pub end: CheckedTimestamp<DateTime<Utc>>,
952}
953
954impl WallclockLagHistogramPeriod {
955    /// Construct a `WallclockLagHistogramPeriod` from the given epoch timestamp and dyncfg.
956    pub fn from_epoch_millis(epoch_ms: u64, dyncfg: &ConfigSet) -> Self {
957        let interval = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.get(dyncfg);
958        let interval_ms = u64::try_from(interval.as_millis()).unwrap_or_else(|_| {
959            soft_panic_or_log!("excessive wallclock lag histogram period interval: {interval:?}");
960            let default = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.default();
961            u64::try_from(default.as_millis()).unwrap()
962        });
963        let interval_ms = std::cmp::max(interval_ms, 1);
964
965        let start_ms = epoch_ms - (epoch_ms % interval_ms);
966        let start_dt = mz_ore::now::to_datetime(start_ms);
967        let start = start_dt.try_into().expect("must fit");
968
969        let end_ms = start_ms + interval_ms;
970        let end_dt = mz_ore::now::to_datetime(end_ms);
971        let end = end_dt.try_into().expect("must fit");
972
973        Self { start, end }
974    }
975}
976
977#[cfg(test)]
978mod tests {
979    use super::*;
980
981    #[mz_ore::test]
982    fn lag_writes_by_zero() {
983        let policy =
984            ReadPolicy::lag_writes_by(mz_repr::Timestamp::default(), mz_repr::Timestamp::default());
985        let write_frontier = Antichain::from_elem(mz_repr::Timestamp::from(5));
986        assert_eq!(policy.frontier(write_frontier.borrow()), write_frontier);
987    }
988}