Skip to main content

mz_storage_client/
controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the storage layer.
11//!
12//! The storage controller curates the creation of sources, the progress of readers through these collections,
13//! and their eventual dropping and resource reclamation.
14//!
15//! The storage controller can be viewed as a partial map from `GlobalId` to collection. It is an error to
16//! use an identifier before it has been "created" with `create_source()`. Once created, the controller holds
17//! a read capability for each source, which is manipulated with `update_read_capabilities()`.
18//! Eventually, the source is dropped with either `drop_sources()` or by allowing compaction to the
19//! empty frontier.
20
21use std::collections::{BTreeMap, BTreeSet};
22use std::fmt::Debug;
23use std::future::Future;
24use std::num::NonZeroI64;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::time::Duration;
28
29use async_trait::async_trait;
30use chrono::{DateTime, Utc};
31use differential_dataflow::lattice::Lattice;
32use mz_cluster_client::ReplicaId;
33use mz_cluster_client::client::ClusterReplicaLocation;
34use mz_controller_types::dyncfgs::WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL;
35use mz_dyncfg::ConfigSet;
36use mz_ore::soft_panic_or_log;
37use mz_persist_client::batch::ProtoBatch;
38use mz_persist_types::{Codec64, Opaque, ShardId};
39use mz_repr::adt::interval::Interval;
40use mz_repr::adt::timestamp::CheckedTimestamp;
41use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row};
42use mz_storage_types::configuration::StorageConfiguration;
43use mz_storage_types::connections::inline::InlinedConnection;
44use mz_storage_types::controller::{CollectionMetadata, StorageError};
45use mz_storage_types::errors::CollectionMissing;
46use mz_storage_types::instances::StorageInstanceId;
47use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
48use mz_storage_types::parameters::StorageParameters;
49use mz_storage_types::read_holds::ReadHold;
50use mz_storage_types::read_policy::ReadPolicy;
51use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
52use mz_storage_types::sources::{
53    GenericSourceConnection, IngestionDescription, SourceDesc, SourceExportDataConfig,
54    SourceExportDetails, Timeline,
55};
56use serde::{Deserialize, Serialize};
57use timely::progress::Timestamp as TimelyTimestamp;
58use timely::progress::frontier::MutableAntichain;
59use timely::progress::{Antichain, Timestamp};
60use tokio::sync::{mpsc, oneshot};
61
62use crate::client::{AppendOnlyUpdate, StatusUpdate, TableData};
63use crate::statistics::WebhookStatistics;
64
65#[derive(
66    Clone,
67    Copy,
68    Debug,
69    Serialize,
70    Deserialize,
71    Eq,
72    PartialEq,
73    Hash,
74    PartialOrd,
75    Ord
76)]
77pub enum IntrospectionType {
78    /// We're not responsible for appending to this collection automatically, but we should
79    /// automatically bump the write frontier from time to time.
80    SinkStatusHistory,
81    SourceStatusHistory,
82    ShardMapping,
83
84    Frontiers,
85    ReplicaFrontiers,
86
87    ReplicaStatusHistory,
88    ReplicaMetricsHistory,
89    WallclockLagHistory,
90    WallclockLagHistogram,
91
92    // Note that this single-shard introspection source will be changed to per-replica,
93    // once we allow multiplexing multiple sources/sinks on a single cluster.
94    StorageSourceStatistics,
95    StorageSinkStatistics,
96
97    // The below are for statement logging.
98    StatementExecutionHistory,
99    SessionHistory,
100    PreparedStatementHistory,
101    SqlText,
102    // For statement lifecycle logging, which is closely related
103    // to statement logging
104    StatementLifecycleHistory,
105
106    // Collections written by the compute controller.
107    ComputeDependencies,
108    ComputeOperatorHydrationStatus,
109    ComputeMaterializedViewRefreshes,
110    ComputeErrorCounts,
111    ComputeHydrationTimes,
112
113    // Written by the Adapter for tracking AWS PrivateLink Connection Status History
114    PrivatelinkConnectionStatusHistory,
115}
116
117/// Describes how data is written to the collection.
118#[derive(Clone, Debug, Eq, PartialEq)]
119pub enum DataSource<T> {
120    /// Ingest data from some external source.
121    Ingestion(IngestionDescription),
122    /// This source receives its data from the identified ingestion,
123    /// from an external object identified using `SourceExportDetails`.
124    ///
125    /// The referenced ingestion must be created before all of its exports.
126    IngestionExport {
127        ingestion_id: GlobalId,
128        details: SourceExportDetails,
129        data_config: SourceExportDataConfig,
130    },
131    /// Data comes from introspection sources, which the controller itself is
132    /// responsible for generating.
133    Introspection(IntrospectionType),
134    /// Data comes from the source's remapping/reclock operator.
135    Progress,
136    /// Data comes from external HTTP requests pushed to Materialize.
137    Webhook,
138    /// The adapter layer appends timestamped data, i.e. it is a `TABLE`.
139    Table,
140    /// This source's data does not need to be managed by the storage
141    /// controller, e.g. it's a materialized view or the catalog collection.
142    Other,
143    /// This collection is the output collection of a sink.
144    Sink { desc: ExportDescription<T> },
145}
146
147/// Describes a request to create a source.
148#[derive(Clone, Debug, Eq, PartialEq)]
149pub struct CollectionDescription<T> {
150    /// The schema of this collection
151    pub desc: RelationDesc,
152    /// The source of this collection's data.
153    pub data_source: DataSource<T>,
154    /// An optional frontier to which the collection's `since` should be advanced.
155    pub since: Option<Antichain<T>>,
156    /// The timeline of the source. Absent for materialized views, continual tasks, etc.
157    pub timeline: Option<Timeline>,
158    /// The primary of this collections.
159    ///
160    /// Multiple storage collections can point to the same persist shard,
161    /// possibly with different schemas. In such a configuration, we select one
162    /// of the involved collections as the primary, who "owns" the persist
163    /// shard. All other involved collections have a dependency on the primary.
164    pub primary: Option<GlobalId>,
165}
166
167impl<T> CollectionDescription<T> {
168    /// Create a CollectionDescription for [`DataSource::Other`].
169    pub fn for_other(desc: RelationDesc, since: Option<Antichain<T>>) -> Self {
170        Self {
171            desc,
172            data_source: DataSource::Other,
173            since,
174            timeline: None,
175            primary: None,
176        }
177    }
178
179    /// Create a CollectionDescription for a table.
180    pub fn for_table(desc: RelationDesc) -> Self {
181        Self {
182            desc,
183            data_source: DataSource::Table,
184            since: None,
185            timeline: Some(Timeline::EpochMilliseconds),
186            primary: None,
187        }
188    }
189}
190
191#[derive(Clone, Debug, Eq, PartialEq)]
192pub struct ExportDescription<T = mz_repr::Timestamp> {
193    pub sink: StorageSinkDesc<(), T>,
194    /// The ID of the instance in which to install the export.
195    pub instance_id: StorageInstanceId,
196}
197
198#[derive(Debug)]
199pub enum Response<T> {
200    FrontierUpdates(Vec<(GlobalId, Antichain<T>)>),
201}
202
203/// Metadata that the storage controller must know to properly handle the life
204/// cycle of creating and dropping collections.
205///
206/// This data should be kept consistent with the state modified using
207/// [`StorageTxn`].
208///
209/// n.b. the "txn WAL shard" is also metadata that's persisted, but if we
210/// included it in this struct it would never be read.
211#[derive(Debug, Clone, Serialize, Default)]
212pub struct StorageMetadata {
213    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
214    pub collection_metadata: BTreeMap<GlobalId, ShardId>,
215    pub unfinalized_shards: BTreeSet<ShardId>,
216}
217
218impl StorageMetadata {
219    pub fn get_collection_shard<T>(&self, id: GlobalId) -> Result<ShardId, StorageError<T>> {
220        let shard_id = self
221            .collection_metadata
222            .get(&id)
223            .ok_or(StorageError::IdentifierMissing(id))?;
224
225        Ok(*shard_id)
226    }
227}
228
229/// Provides an interface for the storage controller to read and write data that
230/// is recorded elsewhere.
231///
232/// Data written to the implementor of this trait should make a consistent view
233/// of the data available through [`StorageMetadata`].
234#[async_trait]
235pub trait StorageTxn<T> {
236    /// Retrieve all of the visible storage metadata.
237    ///
238    /// The value of this map should be treated as opaque.
239    fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId>;
240
241    /// Add new storage metadata for a collection.
242    ///
243    /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must include
244    /// this data.
245    fn insert_collection_metadata(
246        &mut self,
247        s: BTreeMap<GlobalId, ShardId>,
248    ) -> Result<(), StorageError<T>>;
249
250    /// Remove the metadata associated with the identified collections.
251    ///
252    /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must not
253    /// include these keys.
254    fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)>;
255
256    /// Retrieve all of the shards that are no longer in use by an active
257    /// collection but are yet to be finalized.
258    fn get_unfinalized_shards(&self) -> BTreeSet<ShardId>;
259
260    /// Insert the specified values as unfinalized shards.
261    fn insert_unfinalized_shards(&mut self, s: BTreeSet<ShardId>) -> Result<(), StorageError<T>>;
262
263    /// Mark the specified shards as finalized, deleting them from the
264    /// unfinalized shard collection.
265    fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>);
266
267    /// Get the txn WAL shard for this environment if it exists.
268    fn get_txn_wal_shard(&self) -> Option<ShardId>;
269
270    /// Store the specified shard as the environment's txn WAL shard.
271    ///
272    /// The implementor should error if the shard is already specified.
273    fn write_txn_wal_shard(&mut self, shard: ShardId) -> Result<(), StorageError<T>>;
274}
275
276pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
277
278/// A predicate for a `Row` filter.
279pub type RowPredicate = Box<dyn Fn(&Row) -> bool + Send + Sync>;
280
281/// High-level write operations applicable to storage collections.
282pub enum StorageWriteOp {
283    /// Append a set of rows with specified multiplicities.
284    ///
285    /// The multiplicities may be negative, so an `Append` operation can perform
286    /// both insertions and retractions.
287    Append { updates: Vec<(Row, Diff)> },
288    /// Delete all rows matching the given predicate.
289    Delete { filter: RowPredicate },
290}
291
292impl StorageWriteOp {
293    /// Returns whether this operation appends an empty set of updates.
294    pub fn is_empty_append(&self) -> bool {
295        match self {
296            Self::Append { updates } => updates.is_empty(),
297            Self::Delete { .. } => false,
298        }
299    }
300}
301
302#[async_trait(?Send)]
303pub trait StorageController: Debug {
304    type Timestamp: TimelyTimestamp;
305
306    /// Marks the end of any initialization commands.
307    ///
308    /// The implementor may wait for this method to be called before implementing prior commands,
309    /// and so it is important for a user to invoke this method as soon as it is comfortable.
310    /// This method can be invoked immediately, at the potential expense of performance.
311    fn initialization_complete(&mut self);
312
313    /// Update storage configuration with new parameters.
314    fn update_parameters(&mut self, config_params: StorageParameters);
315
316    /// Get the current configuration, including parameters updated with `update_parameters`.
317    fn config(&self) -> &StorageConfiguration;
318
319    /// Returns the [CollectionMetadata] of the collection identified by `id`.
320    fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
321
322    /// Returns `true` iff the given collection/ingestion has been hydrated.
323    ///
324    /// For this check, zero-replica clusters are always considered hydrated.
325    /// Their collections would never normally be considered hydrated but it's
326    /// clearly intentional that they have no replicas.
327    fn collection_hydrated(
328        &self,
329        collection_id: GlobalId,
330    ) -> Result<bool, StorageError<Self::Timestamp>>;
331
332    /// Returns `true` if each non-transient, non-excluded collection is
333    /// hydrated on at least one of the provided replicas.
334    ///
335    /// If no replicas are provided, this checks for hydration on _any_ replica.
336    ///
337    /// This also returns `true` in case this cluster does not have any
338    /// replicas.
339    fn collections_hydrated_on_replicas(
340        &self,
341        target_replica_ids: Option<Vec<ReplicaId>>,
342        target_cluster_ids: &StorageInstanceId,
343        exclude_collections: &BTreeSet<GlobalId>,
344    ) -> Result<bool, StorageError<Self::Timestamp>>;
345
346    /// Returns the since/upper frontiers of the identified collection.
347    fn collection_frontiers(
348        &self,
349        id: GlobalId,
350    ) -> Result<(Antichain<Self::Timestamp>, Antichain<Self::Timestamp>), CollectionMissing>;
351
352    /// Returns the since/upper frontiers of the identified collections.
353    ///
354    /// Having a method that returns both frontiers at the same time, for all
355    /// requested collections, ensures that we can get a consistent "snapshot"
356    /// of collection state. If we had separate methods instead, and/or would
357    /// allow getting frontiers for collections one at a time, it could happen
358    /// that collection state changes concurrently, while information is
359    /// gathered.
360    fn collections_frontiers(
361        &self,
362        id: Vec<GlobalId>,
363    ) -> Result<
364        Vec<(
365            GlobalId,
366            Antichain<Self::Timestamp>,
367            Antichain<Self::Timestamp>,
368        )>,
369        CollectionMissing,
370    >;
371
372    /// Acquire an iterator over [CollectionMetadata] for all active
373    /// collections.
374    ///
375    /// A collection is "active" when it has a non-empty frontier of read
376    /// capabilities.
377    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
378
379    /// Returns the IDs of ingestion exports running on the given instance. This
380    /// includes the ingestion itself, if any, and running source tables (aka.
381    /// subsources).
382    fn active_ingestion_exports(
383        &self,
384        instance_id: StorageInstanceId,
385    ) -> Box<dyn Iterator<Item = &GlobalId> + '_>;
386
387    /// Checks whether a collection exists under the given `GlobalId`. Returns
388    /// an error if the collection does not exist.
389    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
390
391    /// Creates a storage instance with the specified ID.
392    ///
393    /// A storage instance can have zero or one replicas. The instance is
394    /// created with zero replicas.
395    ///
396    /// Panics if a storage instance with the given ID already exists.
397    fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>);
398
399    /// Drops the storage instance with the given ID.
400    ///
401    /// If you call this method while the storage instance has a replica
402    /// attached, that replica will be leaked. Call `drop_replica` first.
403    ///
404    /// Panics if a storage instance with the given ID does not exist.
405    fn drop_instance(&mut self, id: StorageInstanceId);
406
407    /// Updates a storage instance's workload class.
408    fn update_instance_workload_class(
409        &mut self,
410        id: StorageInstanceId,
411        workload_class: Option<String>,
412    );
413
414    /// Connects the storage instance to the specified replica.
415    ///
416    /// If the storage instance is already attached to a replica, communication
417    /// with that replica is severed in favor of the new replica.
418    ///
419    /// In the future, this API will be adjusted to support active replication
420    /// of storage instances (i.e., multiple replicas attached to a given
421    /// storage instance).
422    fn connect_replica(
423        &mut self,
424        instance_id: StorageInstanceId,
425        replica_id: ReplicaId,
426        location: ClusterReplicaLocation,
427    );
428
429    /// Disconnects the storage instance from the specified replica.
430    fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId);
431
432    /// Across versions of Materialize the nullability of columns for some objects can change based
433    /// on updates to our optimizer.
434    ///
435    /// During bootstrap we will register these new schemas with Persist.
436    ///
437    /// See: <https://github.com/MaterializeInc/database-issues/issues/2488>
438    async fn evolve_nullability_for_bootstrap(
439        &mut self,
440        storage_metadata: &StorageMetadata,
441        collections: Vec<(GlobalId, RelationDesc)>,
442    ) -> Result<(), StorageError<Self::Timestamp>>;
443
444    /// Create the sources described in the individual RunIngestionCommand commands.
445    ///
446    /// Each command carries the source id, the source description, and any associated metadata
447    /// needed to ingest the particular source.
448    ///
449    /// This command installs collection state for the indicated sources, and they are
450    /// now valid to use in queries at times beyond the initial `since` frontiers. Each
451    /// collection also acquires a read capability at this frontier, which will need to
452    /// be repeatedly downgraded with `allow_compaction()` to permit compaction.
453    ///
454    /// This method is NOT idempotent; It can fail between processing of different
455    /// collections and leave the controller in an inconsistent state. It is almost
456    /// always wrong to do anything but abort the process on `Err`.
457    ///
458    /// The `register_ts` is used as the initial timestamp that tables are available for reads. (We
459    /// might later give non-tables the same treatment, but hold off on that initially.) Callers
460    /// must provide a Some if any of the collections is a table. A None may be given if none of the
461    /// collections are a table (i.e. all materialized views, sources, etc).
462    async fn create_collections(
463        &mut self,
464        storage_metadata: &StorageMetadata,
465        register_ts: Option<Self::Timestamp>,
466        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
467    ) -> Result<(), StorageError<Self::Timestamp>> {
468        self.create_collections_for_bootstrap(
469            storage_metadata,
470            register_ts,
471            collections,
472            &BTreeSet::new(),
473        )
474        .await
475    }
476
477    /// Like [`Self::create_collections`], except used specifically for bootstrap.
478    ///
479    /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
480    /// from the txn-wal sub-system.
481    async fn create_collections_for_bootstrap(
482        &mut self,
483        storage_metadata: &StorageMetadata,
484        register_ts: Option<Self::Timestamp>,
485        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
486        migrated_storage_collections: &BTreeSet<GlobalId>,
487    ) -> Result<(), StorageError<Self::Timestamp>>;
488
489    /// Check that the ingestion associated with `id` can use the provided
490    /// [`SourceDesc`].
491    ///
492    /// Note that this check is optimistic and its return of `Ok(())` does not
493    /// guarantee that subsequent calls to `alter_ingestion_source_desc` are
494    /// guaranteed to succeed.
495    fn check_alter_ingestion_source_desc(
496        &mut self,
497        ingestion_id: GlobalId,
498        source_desc: &SourceDesc,
499    ) -> Result<(), StorageError<Self::Timestamp>>;
500
501    /// Alters each identified collection to use the correlated [`GenericSourceConnection`].
502    async fn alter_ingestion_connections(
503        &mut self,
504        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
505    ) -> Result<(), StorageError<Self::Timestamp>>;
506
507    /// Alters the data config for the specified source exports of the specified ingestions.
508    async fn alter_ingestion_export_data_configs(
509        &mut self,
510        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
511    ) -> Result<(), StorageError<Self::Timestamp>>;
512
513    async fn alter_table_desc(
514        &mut self,
515        existing_collection: GlobalId,
516        new_collection: GlobalId,
517        new_desc: RelationDesc,
518        expected_version: RelationVersion,
519        register_ts: Self::Timestamp,
520    ) -> Result<(), StorageError<Self::Timestamp>>;
521
522    /// Acquire an immutable reference to the export state, should it exist.
523    fn export(
524        &self,
525        id: GlobalId,
526    ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>;
527
528    /// Acquire a mutable reference to the export state, should it exist.
529    fn export_mut(
530        &mut self,
531        id: GlobalId,
532    ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>;
533
534    /// Create a oneshot ingestion.
535    async fn create_oneshot_ingestion(
536        &mut self,
537        ingestion_id: uuid::Uuid,
538        collection_id: GlobalId,
539        instance_id: StorageInstanceId,
540        request: OneshotIngestionRequest,
541        result_tx: OneshotResultCallback<ProtoBatch>,
542    ) -> Result<(), StorageError<Self::Timestamp>>;
543
544    /// Cancel a oneshot ingestion.
545    fn cancel_oneshot_ingestion(
546        &mut self,
547        ingestion_id: uuid::Uuid,
548    ) -> Result<(), StorageError<Self::Timestamp>>;
549
550    /// Alter the sink identified by the given id to match the provided `ExportDescription`.
551    async fn alter_export(
552        &mut self,
553        id: GlobalId,
554        export: ExportDescription<Self::Timestamp>,
555    ) -> Result<(), StorageError<Self::Timestamp>>;
556
557    /// For each identified export, alter its [`StorageSinkConnection`].
558    async fn alter_export_connections(
559        &mut self,
560        exports: BTreeMap<GlobalId, StorageSinkConnection>,
561    ) -> Result<(), StorageError<Self::Timestamp>>;
562
563    /// Drops the read capability for the tables and allows their resources to be reclaimed.
564    fn drop_tables(
565        &mut self,
566        storage_metadata: &StorageMetadata,
567        identifiers: Vec<GlobalId>,
568        ts: Self::Timestamp,
569    ) -> Result<(), StorageError<Self::Timestamp>>;
570
571    /// Drops the read capability for the sources and allows their resources to be reclaimed.
572    fn drop_sources(
573        &mut self,
574        storage_metadata: &StorageMetadata,
575        identifiers: Vec<GlobalId>,
576    ) -> Result<(), StorageError<Self::Timestamp>>;
577
578    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
579    fn drop_sinks(
580        &mut self,
581        storage_metadata: &StorageMetadata,
582        identifiers: Vec<GlobalId>,
583    ) -> Result<(), StorageError<Self::Timestamp>>;
584
585    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
586    ///
587    /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
588    ///     controller starts/restarts it has no durable state. That means that it has no way of
589    ///     remembering any past commands sent. In the future we plan on persisting state for the
590    ///     controller so that it is aware of past commands.
591    ///     Therefore this method is for dropping sinks that we know to have been previously
592    ///     created, but have been forgotten by the controller due to a restart.
593    ///     Once command history becomes durable we can remove this method and use the normal
594    ///     `drop_sinks`.
595    fn drop_sinks_unvalidated(
596        &mut self,
597        storage_metadata: &StorageMetadata,
598        identifiers: Vec<GlobalId>,
599    );
600
601    /// Drops the read capability for the sources and allows their resources to be reclaimed.
602    ///
603    /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
604    ///     controller starts/restarts it has no durable state. That means that it has no way of
605    ///     remembering any past commands sent. In the future we plan on persisting state for the
606    ///     controller so that it is aware of past commands.
607    ///     Therefore this method is for dropping sources that we know to have been previously
608    ///     created, but have been forgotten by the controller due to a restart.
609    ///     Once command history becomes durable we can remove this method and use the normal
610    ///     `drop_sources`.
611    fn drop_sources_unvalidated(
612        &mut self,
613        storage_metadata: &StorageMetadata,
614        identifiers: Vec<GlobalId>,
615    ) -> Result<(), StorageError<Self::Timestamp>>;
616
617    /// Append `updates` into the local input named `id` and advance its upper to `upper`.
618    ///
619    /// The method returns a oneshot that can be awaited to indicate completion of the write.
620    /// The method may return an error, indicating an immediately visible error, and also the
621    /// oneshot may return an error if one is encountered during the write.
622    ///
623    /// All updates in `commands` are applied atomically.
624    // TODO(petrosagg): switch upper to `Antichain<Timestamp>`
625    fn append_table(
626        &mut self,
627        write_ts: Self::Timestamp,
628        advance_to: Self::Timestamp,
629        commands: Vec<(GlobalId, Vec<TableData>)>,
630    ) -> Result<
631        tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
632        StorageError<Self::Timestamp>,
633    >;
634
635    /// Returns a [`MonotonicAppender`] which is a channel that can be used to monotonically
636    /// append to the specified [`GlobalId`].
637    fn monotonic_appender(
638        &self,
639        id: GlobalId,
640    ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>>;
641
642    /// Returns a shared [`WebhookStatistics`] which can be used to report user-facing
643    /// statistics for this given webhhook, specified by the [`GlobalId`].
644    ///
645    // This is used to support a fairly special case, where a source needs to report statistics
646    // from outside the ordinary controller-clusterd path. Its possible to merge this with
647    // `monotonic_appender`, whose only current user is webhooks, but given that they will
648    // likely be moved to clusterd, we just leave this a special case.
649    fn webhook_statistics(
650        &self,
651        id: GlobalId,
652    ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>>;
653
654    /// Waits until the controller is ready to process a response.
655    ///
656    /// This method may block for an arbitrarily long time.
657    ///
658    /// When the method returns, the owner should call
659    /// [`StorageController::process`] to process the ready message.
660    ///
661    /// This method is cancellation safe.
662    async fn ready(&mut self);
663
664    /// Processes the work queued by [`StorageController::ready`].
665    fn process(
666        &mut self,
667        storage_metadata: &StorageMetadata,
668    ) -> Result<Option<Response<Self::Timestamp>>, anyhow::Error>;
669
670    /// Exposes the internal state of the data shard for debugging and QA.
671    ///
672    /// We'll be thoughtful about making unnecessary changes, but the **output
673    /// of this method needs to be gated from users**, so that it's not subject
674    /// to our backward compatibility guarantees.
675    ///
676    /// TODO: Ideally this would return `impl Serialize` so the caller can do
677    /// with it what they like, but that doesn't work in traits yet. The
678    /// workaround (an associated type) doesn't work because persist doesn't
679    /// want to make the type public. In the meantime, move the `serde_json`
680    /// call from the single user into this method.
681    async fn inspect_persist_state(&self, id: GlobalId)
682    -> Result<serde_json::Value, anyhow::Error>;
683
684    /// Records append-only updates for the given introspection type.
685    ///
686    /// Rows passed in `updates` MUST have the correct schema for the given
687    /// introspection type, as readers rely on this and might panic otherwise.
688    fn append_introspection_updates(&mut self, type_: IntrospectionType, updates: Vec<(Row, Diff)>);
689
690    /// Records append-only status updates for the given introspection type.
691    fn append_status_introspection_updates(
692        &mut self,
693        type_: IntrospectionType,
694        updates: Vec<StatusUpdate>,
695    );
696
697    /// Updates the desired state of the given introspection type.
698    ///
699    /// Rows passed in `op` MUST have the correct schema for the given
700    /// introspection type, as readers rely on this and might panic otherwise.
701    fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp);
702
703    /// Returns a sender for updates to the specified append-only introspection collection.
704    ///
705    /// # Panics
706    ///
707    /// Panics if the given introspection type is not associated with an append-only collection.
708    fn append_only_introspection_tx(
709        &self,
710        type_: IntrospectionType,
711    ) -> mpsc::UnboundedSender<(
712        Vec<AppendOnlyUpdate>,
713        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
714    )>;
715
716    /// Returns a sender for updates to the specified differential introspection collection.
717    ///
718    /// # Panics
719    ///
720    /// Panics if the given introspection type is not associated with a differential collection.
721    fn differential_introspection_tx(
722        &self,
723        type_: IntrospectionType,
724    ) -> mpsc::UnboundedSender<(
725        StorageWriteOp,
726        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
727    )>;
728
729    async fn real_time_recent_timestamp(
730        &self,
731        source_ids: BTreeSet<GlobalId>,
732        timeout: Duration,
733    ) -> Result<
734        BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
735        StorageError<Self::Timestamp>,
736    >;
737
738    /// Returns the state of the [`StorageController`] formatted as JSON.
739    fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
740}
741
742impl<T> DataSource<T> {
743    /// Returns true if the storage controller manages the data shard for this
744    /// source using txn-wal.
745    pub fn in_txns(&self) -> bool {
746        match self {
747            DataSource::Table => true,
748            DataSource::Other
749            | DataSource::Ingestion(_)
750            | DataSource::IngestionExport { .. }
751            | DataSource::Introspection(_)
752            | DataSource::Progress
753            | DataSource::Webhook => false,
754            DataSource::Sink { .. } => false,
755        }
756    }
757}
758
759/// A wrapper struct that presents the adapter token to a format that is understandable by persist
760/// and also allows us to differentiate between a token being present versus being set for the
761/// first time.
762#[derive(PartialEq, Clone, Debug)]
763pub struct PersistEpoch(pub Option<NonZeroI64>);
764
765impl Opaque for PersistEpoch {
766    fn initial() -> Self {
767        PersistEpoch(None)
768    }
769}
770
771impl Codec64 for PersistEpoch {
772    fn codec_name() -> String {
773        "PersistEpoch".to_owned()
774    }
775
776    fn encode(&self) -> [u8; 8] {
777        self.0.map(NonZeroI64::get).unwrap_or(0).to_le_bytes()
778    }
779
780    fn decode(buf: [u8; 8]) -> Self {
781        Self(NonZeroI64::new(i64::from_le_bytes(buf)))
782    }
783}
784
785impl From<NonZeroI64> for PersistEpoch {
786    fn from(epoch: NonZeroI64) -> Self {
787        Self(Some(epoch))
788    }
789}
790
791/// State maintained about individual exports.
792#[derive(Debug)]
793pub struct ExportState<T: TimelyTimestamp> {
794    /// Really only for keeping track of changes to the `derived_since`.
795    pub read_capabilities: MutableAntichain<T>,
796
797    /// The cluster this export is associated with.
798    pub cluster_id: StorageInstanceId,
799
800    /// The current since frontier, derived from `write_frontier` using
801    /// `hold_policy`.
802    pub derived_since: Antichain<T>,
803
804    /// The read holds that this export has on its dependencies (its input and itself). When
805    /// the upper of the export changes, we downgrade this, which in turn
806    /// downgrades holds we have on our dependencies' sinces.
807    pub read_holds: [ReadHold<T>; 2],
808
809    /// The policy to use to downgrade `self.read_capability`.
810    pub read_policy: ReadPolicy<T>,
811
812    /// Reported write frontier.
813    pub write_frontier: Antichain<T>,
814}
815
816impl<T: Timestamp> ExportState<T> {
817    pub fn new(
818        cluster_id: StorageInstanceId,
819        read_hold: ReadHold<T>,
820        self_hold: ReadHold<T>,
821        write_frontier: Antichain<T>,
822        read_policy: ReadPolicy<T>,
823    ) -> Self
824    where
825        T: Lattice,
826    {
827        let mut dependency_since = Antichain::from_elem(T::minimum());
828        for read_hold in [&read_hold, &self_hold] {
829            dependency_since.join_assign(read_hold.since());
830        }
831        Self {
832            read_capabilities: MutableAntichain::from(dependency_since.borrow()),
833            cluster_id,
834            derived_since: dependency_since,
835            read_holds: [read_hold, self_hold],
836            read_policy,
837            write_frontier,
838        }
839    }
840
841    /// Returns the cluster to which the export is bound.
842    pub fn cluster_id(&self) -> StorageInstanceId {
843        self.cluster_id
844    }
845
846    /// Returns the cluster to which the export is bound.
847    pub fn input_hold(&self) -> &ReadHold<T> {
848        &self.read_holds[0]
849    }
850
851    /// Returns whether the export was dropped.
852    pub fn is_dropped(&self) -> bool {
853        self.read_holds.iter().all(|h| h.since().is_empty())
854    }
855}
856/// A channel that allows you to append a set of updates to a pre-defined [`GlobalId`].
857///
858/// See `CollectionManager::monotonic_appender` to acquire a [`MonotonicAppender`].
859#[derive(Clone, Debug)]
860pub struct MonotonicAppender<T> {
861    /// Channel that sends to a [`tokio::task`] which pushes updates to Persist.
862    tx: mpsc::UnboundedSender<(
863        Vec<AppendOnlyUpdate>,
864        oneshot::Sender<Result<(), StorageError<T>>>,
865    )>,
866}
867
868impl<T> MonotonicAppender<T> {
869    pub fn new(
870        tx: mpsc::UnboundedSender<(
871            Vec<AppendOnlyUpdate>,
872            oneshot::Sender<Result<(), StorageError<T>>>,
873        )>,
874    ) -> Self {
875        MonotonicAppender { tx }
876    }
877
878    pub async fn append(&self, updates: Vec<AppendOnlyUpdate>) -> Result<(), StorageError<T>> {
879        let (tx, rx) = oneshot::channel();
880
881        // Send our update to the CollectionManager.
882        self.tx
883            .send((updates, tx))
884            .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
885
886        // Wait for a response, if we fail to receive then the CollectionManager has gone away.
887        let result = rx
888            .await
889            .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
890
891        result
892    }
893}
894
895/// A wallclock lag measurement.
896///
897/// The enum representation reflects the fact that wallclock lag is undefined for unreadable
898/// collections, i.e. collections that contain no readable times.
899#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
900pub enum WallclockLag {
901    /// Lag value in seconds, for readable collections.
902    Seconds(u64),
903    /// Undefined lag, for unreadable collections.
904    Undefined,
905}
906
907impl WallclockLag {
908    /// The smallest possible wallclock lag measurement.
909    pub const MIN: Self = Self::Seconds(0);
910
911    /// Return the maximum of two lag values.
912    ///
913    /// We treat `Undefined` lags as greater than `Seconds`, to ensure we never report low lag
914    /// values when a collection was actually unreadable for some amount of time.
915    pub fn max(self, other: Self) -> Self {
916        match (self, other) {
917            (Self::Seconds(a), Self::Seconds(b)) => Self::Seconds(a.max(b)),
918            (Self::Undefined, _) | (_, Self::Undefined) => Self::Undefined,
919        }
920    }
921
922    /// Return the wrapped seconds value, or a default if the lag is `Undefined`.
923    pub fn unwrap_seconds_or(self, default: u64) -> u64 {
924        match self {
925            Self::Seconds(s) => s,
926            Self::Undefined => default,
927        }
928    }
929
930    /// Create a new `WallclockLag` by transforming the wrapped seconds value.
931    pub fn map_seconds(self, f: impl FnOnce(u64) -> u64) -> Self {
932        match self {
933            Self::Seconds(s) => Self::Seconds(f(s)),
934            Self::Undefined => Self::Undefined,
935        }
936    }
937
938    /// Convert this lag value into a [`Datum::Interval`] or [`Datum::Null`].
939    pub fn into_interval_datum(self) -> Datum<'static> {
940        match self {
941            Self::Seconds(secs) => {
942                let micros = i64::try_from(secs * 1_000_000).expect("must fit");
943                Datum::Interval(Interval::new(0, 0, micros))
944            }
945            Self::Undefined => Datum::Null,
946        }
947    }
948
949    /// Convert this lag value into a [`Datum::UInt64`] or [`Datum::Null`].
950    pub fn into_uint64_datum(self) -> Datum<'static> {
951        match self {
952            Self::Seconds(secs) => Datum::UInt64(secs),
953            Self::Undefined => Datum::Null,
954        }
955    }
956}
957
958/// The period covered by a wallclock lag histogram, represented as a `[start, end)` range.
959#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
960pub struct WallclockLagHistogramPeriod {
961    pub start: CheckedTimestamp<DateTime<Utc>>,
962    pub end: CheckedTimestamp<DateTime<Utc>>,
963}
964
965impl WallclockLagHistogramPeriod {
966    /// Construct a `WallclockLagHistogramPeriod` from the given epoch timestamp and dyncfg.
967    pub fn from_epoch_millis(epoch_ms: u64, dyncfg: &ConfigSet) -> Self {
968        let interval = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.get(dyncfg);
969        let interval_ms = u64::try_from(interval.as_millis()).unwrap_or_else(|_| {
970            soft_panic_or_log!("excessive wallclock lag histogram period interval: {interval:?}");
971            let default = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.default();
972            u64::try_from(default.as_millis()).unwrap()
973        });
974        let interval_ms = std::cmp::max(interval_ms, 1);
975
976        let start_ms = epoch_ms - (epoch_ms % interval_ms);
977        let start_dt = mz_ore::now::to_datetime(start_ms);
978        let start = start_dt.try_into().expect("must fit");
979
980        let end_ms = start_ms + interval_ms;
981        let end_dt = mz_ore::now::to_datetime(end_ms);
982        let end = end_dt.try_into().expect("must fit");
983
984        Self { start, end }
985    }
986}
987
988#[cfg(test)]
989mod tests {
990    use super::*;
991
992    #[mz_ore::test]
993    fn lag_writes_by_zero() {
994        let policy =
995            ReadPolicy::lag_writes_by(mz_repr::Timestamp::default(), mz_repr::Timestamp::default());
996        let write_frontier = Antichain::from_elem(mz_repr::Timestamp::from(5));
997        assert_eq!(policy.frontier(write_frontier.borrow()), write_frontier);
998    }
999}