mz_storage_client/
controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the storage layer.
11//!
12//! The storage controller curates the creation of sources, the progress of readers through these collections,
13//! and their eventual dropping and resource reclamation.
14//!
15//! The storage controller can be viewed as a partial map from `GlobalId` to collection. It is an error to
16//! use an identifier before it has been "created" with `create_source()`. Once created, the controller holds
17//! a read capability for each source, which is manipulated with `update_read_capabilities()`.
18//! Eventually, the source is dropped with either `drop_sources()` or by allowing compaction to the
19//! empty frontier.
20
21use std::collections::{BTreeMap, BTreeSet};
22use std::fmt::Debug;
23use std::future::Future;
24use std::num::NonZeroI64;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::time::Duration;
28
29use async_trait::async_trait;
30use chrono::{DateTime, Utc};
31use differential_dataflow::lattice::Lattice;
32use mz_cluster_client::ReplicaId;
33use mz_cluster_client::client::ClusterReplicaLocation;
34use mz_controller_types::dyncfgs::WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL;
35use mz_dyncfg::ConfigSet;
36use mz_ore::soft_panic_or_log;
37use mz_persist_client::batch::ProtoBatch;
38use mz_persist_types::{Codec64, Opaque, ShardId};
39use mz_repr::adt::timestamp::CheckedTimestamp;
40use mz_repr::{Diff, GlobalId, RelationDesc, RelationVersion, Row};
41use mz_storage_types::configuration::StorageConfiguration;
42use mz_storage_types::connections::inline::InlinedConnection;
43use mz_storage_types::controller::{CollectionMetadata, StorageError};
44use mz_storage_types::instances::StorageInstanceId;
45use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
46use mz_storage_types::parameters::StorageParameters;
47use mz_storage_types::read_holds::ReadHold;
48use mz_storage_types::read_policy::ReadPolicy;
49use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
50use mz_storage_types::sources::{
51    GenericSourceConnection, IngestionDescription, SourceDesc, SourceExportDataConfig,
52    SourceExportDetails, Timeline,
53};
54use serde::{Deserialize, Serialize};
55use timely::progress::Timestamp as TimelyTimestamp;
56use timely::progress::frontier::MutableAntichain;
57use timely::progress::{Antichain, Timestamp};
58use tokio::sync::{mpsc, oneshot};
59
60use crate::client::{AppendOnlyUpdate, StatusUpdate, TableData};
61use crate::statistics::WebhookStatistics;
62
63#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, Hash, PartialOrd, Ord)]
64pub enum IntrospectionType {
65    /// We're not responsible for appending to this collection automatically, but we should
66    /// automatically bump the write frontier from time to time.
67    SinkStatusHistory,
68    SourceStatusHistory,
69    ShardMapping,
70
71    Frontiers,
72    ReplicaFrontiers,
73
74    ReplicaStatusHistory,
75    ReplicaMetricsHistory,
76    WallclockLagHistory,
77    WallclockLagHistogram,
78
79    // Note that this single-shard introspection source will be changed to per-replica,
80    // once we allow multiplexing multiple sources/sinks on a single cluster.
81    StorageSourceStatistics,
82    StorageSinkStatistics,
83
84    // The below are for statement logging.
85    StatementExecutionHistory,
86    SessionHistory,
87    PreparedStatementHistory,
88    SqlText,
89    // For statement lifecycle logging, which is closely related
90    // to statement logging
91    StatementLifecycleHistory,
92
93    // Collections written by the compute controller.
94    ComputeDependencies,
95    ComputeOperatorHydrationStatus,
96    ComputeMaterializedViewRefreshes,
97    ComputeErrorCounts,
98    ComputeHydrationTimes,
99
100    // Written by the Adapter for tracking AWS PrivateLink Connection Status History
101    PrivatelinkConnectionStatusHistory,
102}
103
104/// Describes how data is written to the collection.
105#[derive(Clone, Debug, Eq, PartialEq)]
106pub enum DataSource<T> {
107    /// Ingest data from some external source.
108    Ingestion(IngestionDescription),
109    /// This source receives its data from the identified ingestion,
110    /// from an external object identified using `SourceExportDetails`.
111    ///
112    /// The referenced ingestion must be created before all of its exports.
113    IngestionExport {
114        ingestion_id: GlobalId,
115        details: SourceExportDetails,
116        data_config: SourceExportDataConfig,
117    },
118    /// Data comes from introspection sources, which the controller itself is
119    /// responsible for generating.
120    Introspection(IntrospectionType),
121    /// Data comes from the source's remapping/reclock operator.
122    Progress,
123    /// Data comes from external HTTP requests pushed to Materialize.
124    Webhook,
125    /// The adapter layer appends timestamped data, i.e. it is a `TABLE`.
126    Table {
127        /// This table has had columns added or dropped to it, so we're now a
128        /// "view" over the "primary" Table/collection. Within the
129        /// `storage-controller` we the primary as a dependency.
130        primary: Option<GlobalId>,
131    },
132    /// This source's data does not need to be managed by the storage
133    /// controller, e.g. it's a materialized view or the catalog collection.
134    Other,
135    /// This collection is the output collection of a sink.
136    Sink { desc: ExportDescription<T> },
137}
138
139/// Describes a request to create a source.
140#[derive(Clone, Debug, Eq, PartialEq)]
141pub struct CollectionDescription<T> {
142    /// The schema of this collection
143    pub desc: RelationDesc,
144    /// The source of this collection's data.
145    pub data_source: DataSource<T>,
146    /// An optional frontier to which the collection's `since` should be advanced.
147    pub since: Option<Antichain<T>>,
148    /// A GlobalId to use for this collection to use for the status collection.
149    /// Used to keep track of source status/error information.
150    pub status_collection_id: Option<GlobalId>,
151    /// The timeline of the source. Absent for materialized views, continual tasks, etc.
152    pub timeline: Option<Timeline>,
153}
154
155impl<T> CollectionDescription<T> {
156    pub fn for_table(desc: RelationDesc, primary: Option<GlobalId>) -> Self {
157        Self {
158            desc,
159            data_source: DataSource::Table { primary },
160            since: None,
161            status_collection_id: None,
162            timeline: Some(Timeline::EpochMilliseconds),
163        }
164    }
165}
166
167#[derive(Clone, Debug, Eq, PartialEq)]
168pub struct ExportDescription<T = mz_repr::Timestamp> {
169    pub sink: StorageSinkDesc<(), T>,
170    /// The ID of the instance in which to install the export.
171    pub instance_id: StorageInstanceId,
172}
173
174#[derive(Debug)]
175pub enum Response<T> {
176    FrontierUpdates(Vec<(GlobalId, Antichain<T>)>),
177}
178
179/// Metadata that the storage controller must know to properly handle the life
180/// cycle of creating and dropping collections.
181///
182/// This data should be kept consistent with the state modified using
183/// [`StorageTxn`].
184///
185/// n.b. the "txn WAL shard" is also metadata that's persisted, but if we
186/// included it in this struct it would never be read.
187#[derive(Debug, Clone, Serialize, Default)]
188pub struct StorageMetadata {
189    #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
190    pub collection_metadata: BTreeMap<GlobalId, ShardId>,
191    pub unfinalized_shards: BTreeSet<ShardId>,
192}
193
194impl StorageMetadata {
195    pub fn get_collection_shard<T>(&self, id: GlobalId) -> Result<ShardId, StorageError<T>> {
196        let shard_id = self
197            .collection_metadata
198            .get(&id)
199            .ok_or(StorageError::IdentifierMissing(id))?;
200
201        Ok(*shard_id)
202    }
203}
204
205/// Provides an interface for the storage controller to read and write data that
206/// is recorded elsewhere.
207///
208/// Data written to the implementor of this trait should make a consistent view
209/// of the data available through [`StorageMetadata`].
210#[async_trait]
211pub trait StorageTxn<T> {
212    /// Retrieve all of the visible storage metadata.
213    ///
214    /// The value of this map should be treated as opaque.
215    fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId>;
216
217    /// Add new storage metadata for a collection.
218    ///
219    /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must include
220    /// this data.
221    fn insert_collection_metadata(
222        &mut self,
223        s: BTreeMap<GlobalId, ShardId>,
224    ) -> Result<(), StorageError<T>>;
225
226    /// Remove the metadata associated with the identified collections.
227    ///
228    /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must not
229    /// include these keys.
230    fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)>;
231
232    /// Retrieve all of the shards that are no longer in use by an active
233    /// collection but are yet to be finalized.
234    fn get_unfinalized_shards(&self) -> BTreeSet<ShardId>;
235
236    /// Insert the specified values as unfinalized shards.
237    fn insert_unfinalized_shards(&mut self, s: BTreeSet<ShardId>) -> Result<(), StorageError<T>>;
238
239    /// Mark the specified shards as finalized, deleting them from the
240    /// unfinalized shard collection.
241    fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>);
242
243    /// Get the txn WAL shard for this environment if it exists.
244    fn get_txn_wal_shard(&self) -> Option<ShardId>;
245
246    /// Store the specified shard as the environment's txn WAL shard.
247    ///
248    /// The implementor should error if the shard is already specified.
249    fn write_txn_wal_shard(&mut self, shard: ShardId) -> Result<(), StorageError<T>>;
250}
251
252pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
253
254/// A predicate for a `Row` filter.
255pub type RowPredicate = Box<dyn Fn(&Row) -> bool + Send + Sync>;
256
257/// High-level write operations applicable to storage collections.
258pub enum StorageWriteOp {
259    /// Append a set of rows with specified multiplicities.
260    ///
261    /// The multiplicities may be negative, so an `Append` operation can perform
262    /// both insertions and retractions.
263    Append { updates: Vec<(Row, Diff)> },
264    /// Delete all rows matching the given predicate.
265    Delete { filter: RowPredicate },
266}
267
268impl StorageWriteOp {
269    /// Returns whether this operation appends an empty set of updates.
270    pub fn is_empty_append(&self) -> bool {
271        match self {
272            Self::Append { updates } => updates.is_empty(),
273            Self::Delete { .. } => false,
274        }
275    }
276}
277
278#[async_trait(?Send)]
279pub trait StorageController: Debug {
280    type Timestamp: TimelyTimestamp;
281
282    /// Marks the end of any initialization commands.
283    ///
284    /// The implementor may wait for this method to be called before implementing prior commands,
285    /// and so it is important for a user to invoke this method as soon as it is comfortable.
286    /// This method can be invoked immediately, at the potential expense of performance.
287    fn initialization_complete(&mut self);
288
289    /// Update storage configuration with new parameters.
290    fn update_parameters(&mut self, config_params: StorageParameters);
291
292    /// Get the current configuration, including parameters updated with `update_parameters`.
293    fn config(&self) -> &StorageConfiguration;
294
295    /// Returns the [CollectionMetadata] of the collection identified by `id`.
296    fn collection_metadata(
297        &self,
298        id: GlobalId,
299    ) -> Result<CollectionMetadata, StorageError<Self::Timestamp>>;
300
301    /// Returns `true` iff the given collection/ingestion has been hydrated.
302    ///
303    /// For this check, zero-replica clusters are always considered hydrated.
304    /// Their collections would never normally be considered hydrated but it's
305    /// clearly intentional that they have no replicas.
306    fn collection_hydrated(
307        &self,
308        collection_id: GlobalId,
309    ) -> Result<bool, StorageError<Self::Timestamp>>;
310
311    /// Returns `true` if each non-transient, non-excluded collection is
312    /// hydrated on at least one of the provided replicas.
313    ///
314    /// If no replicas are provided, this checks for hydration on _any_ replica.
315    ///
316    /// This also returns `true` in case this cluster does not have any
317    /// replicas.
318    fn collections_hydrated_on_replicas(
319        &self,
320        target_replica_ids: Option<Vec<ReplicaId>>,
321        exclude_collections: &BTreeSet<GlobalId>,
322    ) -> Result<bool, StorageError<Self::Timestamp>>;
323
324    /// Returns the since/upper frontiers of the identified collection.
325    fn collection_frontiers(
326        &self,
327        id: GlobalId,
328    ) -> Result<
329        (Antichain<Self::Timestamp>, Antichain<Self::Timestamp>),
330        StorageError<Self::Timestamp>,
331    >;
332
333    /// Returns the since/upper frontiers of the identified collections.
334    ///
335    /// Having a method that returns both frontiers at the same time, for all
336    /// requested collections, ensures that we can get a consistent "snapshot"
337    /// of collection state. If we had separate methods instead, and/or would
338    /// allow getting frontiers for collections one at a time, it could happen
339    /// that collection state changes concurrently, while information is
340    /// gathered.
341    fn collections_frontiers(
342        &self,
343        id: Vec<GlobalId>,
344    ) -> Result<
345        Vec<(
346            GlobalId,
347            Antichain<Self::Timestamp>,
348            Antichain<Self::Timestamp>,
349        )>,
350        StorageError<Self::Timestamp>,
351    >;
352
353    /// Acquire an iterator over [CollectionMetadata] for all active
354    /// collections.
355    ///
356    /// A collection is "active" when it has a non empty frontier of read
357    /// capabilties.
358    fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
359
360    /// Returns the IDs of all active ingestions for the given storage instance.
361    fn active_ingestions(
362        &self,
363        instance_id: StorageInstanceId,
364    ) -> Box<dyn Iterator<Item = &GlobalId> + '_>;
365
366    /// Checks whether a collection exists under the given `GlobalId`. Returns
367    /// an error if the collection does not exist.
368    fn check_exists(&self, id: GlobalId) -> Result<(), StorageError<Self::Timestamp>>;
369
370    /// Creates a storage instance with the specified ID.
371    ///
372    /// A storage instance can have zero or one replicas. The instance is
373    /// created with zero replicas.
374    ///
375    /// Panics if a storage instance with the given ID already exists.
376    fn create_instance(&mut self, id: StorageInstanceId);
377
378    /// Drops the storage instance with the given ID.
379    ///
380    /// If you call this method while the storage instance has a replica
381    /// attached, that replica will be leaked. Call `drop_replica` first.
382    ///
383    /// Panics if a storage instance with the given ID does not exist.
384    fn drop_instance(&mut self, id: StorageInstanceId);
385
386    /// Updates a storage instance's workload class.
387    fn update_instance_workload_class(
388        &mut self,
389        id: StorageInstanceId,
390        workload_class: Option<String>,
391    );
392
393    /// Connects the storage instance to the specified replica.
394    ///
395    /// If the storage instance is already attached to a replica, communication
396    /// with that replica is severed in favor of the new replica.
397    ///
398    /// In the future, this API will be adjusted to support active replication
399    /// of storage instances (i.e., multiple replicas attached to a given
400    /// storage instance).
401    fn connect_replica(
402        &mut self,
403        instance_id: StorageInstanceId,
404        replica_id: ReplicaId,
405        location: ClusterReplicaLocation,
406    );
407
408    /// Disconnects the storage instance from the specified replica.
409    fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId);
410
411    /// Across versions of Materialize the nullability of columns for some objects can change based
412    /// on updates to our optimizer.
413    ///
414    /// During bootstrap we will register these new schemas with Persist.
415    ///
416    /// See: <https://github.com/MaterializeInc/database-issues/issues/2488>
417    async fn evolve_nullability_for_bootstrap(
418        &mut self,
419        storage_metadata: &StorageMetadata,
420        collections: Vec<(GlobalId, RelationDesc)>,
421    ) -> Result<(), StorageError<Self::Timestamp>>;
422
423    /// Create the sources described in the individual RunIngestionCommand commands.
424    ///
425    /// Each command carries the source id, the source description, and any associated metadata
426    /// needed to ingest the particular source.
427    ///
428    /// This command installs collection state for the indicated sources, and they are
429    /// now valid to use in queries at times beyond the initial `since` frontiers. Each
430    /// collection also acquires a read capability at this frontier, which will need to
431    /// be repeatedly downgraded with `allow_compaction()` to permit compaction.
432    ///
433    /// This method is NOT idempotent; It can fail between processing of different
434    /// collections and leave the controller in an inconsistent state. It is almost
435    /// always wrong to do anything but abort the process on `Err`.
436    ///
437    /// The `register_ts` is used as the initial timestamp that tables are available for reads. (We
438    /// might later give non-tables the same treatment, but hold off on that initially.) Callers
439    /// must provide a Some if any of the collections is a table. A None may be given if none of the
440    /// collections are a table (i.e. all materialized views, sources, etc).
441    async fn create_collections(
442        &mut self,
443        storage_metadata: &StorageMetadata,
444        register_ts: Option<Self::Timestamp>,
445        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
446    ) -> Result<(), StorageError<Self::Timestamp>> {
447        self.create_collections_for_bootstrap(
448            storage_metadata,
449            register_ts,
450            collections,
451            &BTreeSet::new(),
452        )
453        .await
454    }
455
456    /// Like [`Self::create_collections`], except used specifically for bootstrap.
457    ///
458    /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
459    /// from the txn-wal sub-system.
460    async fn create_collections_for_bootstrap(
461        &mut self,
462        storage_metadata: &StorageMetadata,
463        register_ts: Option<Self::Timestamp>,
464        collections: Vec<(GlobalId, CollectionDescription<Self::Timestamp>)>,
465        migrated_storage_collections: &BTreeSet<GlobalId>,
466    ) -> Result<(), StorageError<Self::Timestamp>>;
467
468    /// Check that the ingestion associated with `id` can use the provided
469    /// [`SourceDesc`].
470    ///
471    /// Note that this check is optimistic and its return of `Ok(())` does not
472    /// guarantee that subsequent calls to `alter_ingestion_source_desc` are
473    /// guaranteed to succeed.
474    fn check_alter_ingestion_source_desc(
475        &mut self,
476        ingestion_id: GlobalId,
477        source_desc: &SourceDesc,
478    ) -> Result<(), StorageError<Self::Timestamp>>;
479
480    /// Alters the identified collection to use the provided [`SourceDesc`].
481    async fn alter_ingestion_source_desc(
482        &mut self,
483        ingestion_id: GlobalId,
484        source_desc: SourceDesc,
485    ) -> Result<(), StorageError<Self::Timestamp>>;
486
487    /// Alters each identified collection to use the correlated [`GenericSourceConnection`].
488    async fn alter_ingestion_connections(
489        &mut self,
490        source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
491    ) -> Result<(), StorageError<Self::Timestamp>>;
492
493    /// Alters the data config for the specified source exports of the specified ingestions.
494    async fn alter_ingestion_export_data_configs(
495        &mut self,
496        source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
497    ) -> Result<(), StorageError<Self::Timestamp>>;
498
499    async fn alter_table_desc(
500        &mut self,
501        existing_collection: GlobalId,
502        new_collection: GlobalId,
503        new_desc: RelationDesc,
504        expected_version: RelationVersion,
505        register_ts: Self::Timestamp,
506    ) -> Result<(), StorageError<Self::Timestamp>>;
507
508    /// Acquire an immutable reference to the export state, should it exist.
509    fn export(
510        &self,
511        id: GlobalId,
512    ) -> Result<&ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>;
513
514    /// Acquire a mutable reference to the export state, should it exist.
515    fn export_mut(
516        &mut self,
517        id: GlobalId,
518    ) -> Result<&mut ExportState<Self::Timestamp>, StorageError<Self::Timestamp>>;
519
520    /// Create a oneshot ingestion.
521    async fn create_oneshot_ingestion(
522        &mut self,
523        ingestion_id: uuid::Uuid,
524        collection_id: GlobalId,
525        instance_id: StorageInstanceId,
526        request: OneshotIngestionRequest,
527        result_tx: OneshotResultCallback<ProtoBatch>,
528    ) -> Result<(), StorageError<Self::Timestamp>>;
529
530    /// Cancel a oneshot ingestion.
531    fn cancel_oneshot_ingestion(
532        &mut self,
533        ingestion_id: uuid::Uuid,
534    ) -> Result<(), StorageError<Self::Timestamp>>;
535
536    /// Alter the sink identified by the given id to match the provided `ExportDescription`.
537    async fn alter_export(
538        &mut self,
539        id: GlobalId,
540        export: ExportDescription<Self::Timestamp>,
541    ) -> Result<(), StorageError<Self::Timestamp>>;
542
543    /// For each identified export, alter its [`StorageSinkConnection`].
544    async fn alter_export_connections(
545        &mut self,
546        exports: BTreeMap<GlobalId, StorageSinkConnection>,
547    ) -> Result<(), StorageError<Self::Timestamp>>;
548
549    /// Drops the read capability for the tables and allows their resources to be reclaimed.
550    fn drop_tables(
551        &mut self,
552        storage_metadata: &StorageMetadata,
553        identifiers: Vec<GlobalId>,
554        ts: Self::Timestamp,
555    ) -> Result<(), StorageError<Self::Timestamp>>;
556
557    /// Drops the read capability for the sources and allows their resources to be reclaimed.
558    fn drop_sources(
559        &mut self,
560        storage_metadata: &StorageMetadata,
561        identifiers: Vec<GlobalId>,
562    ) -> Result<(), StorageError<Self::Timestamp>>;
563
564    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
565    fn drop_sinks(
566        &mut self,
567        storage_metadata: &StorageMetadata,
568        identifiers: Vec<GlobalId>,
569    ) -> Result<(), StorageError<Self::Timestamp>>;
570
571    /// Drops the read capability for the sinks and allows their resources to be reclaimed.
572    ///
573    /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
574    ///     controller starts/restarts it has no durable state. That means that it has no way of
575    ///     remembering any past commands sent. In the future we plan on persisting state for the
576    ///     controller so that it is aware of past commands.
577    ///     Therefore this method is for dropping sinks that we know to have been previously
578    ///     created, but have been forgotten by the controller due to a restart.
579    ///     Once command history becomes durable we can remove this method and use the normal
580    ///     `drop_sinks`.
581    fn drop_sinks_unvalidated(
582        &mut self,
583        storage_metadata: &StorageMetadata,
584        identifiers: Vec<GlobalId>,
585    );
586
587    /// Drops the read capability for the sources and allows their resources to be reclaimed.
588    ///
589    /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
590    ///     controller starts/restarts it has no durable state. That means that it has no way of
591    ///     remembering any past commands sent. In the future we plan on persisting state for the
592    ///     controller so that it is aware of past commands.
593    ///     Therefore this method is for dropping sources that we know to have been previously
594    ///     created, but have been forgotten by the controller due to a restart.
595    ///     Once command history becomes durable we can remove this method and use the normal
596    ///     `drop_sources`.
597    fn drop_sources_unvalidated(
598        &mut self,
599        storage_metadata: &StorageMetadata,
600        identifiers: Vec<GlobalId>,
601    ) -> Result<(), StorageError<Self::Timestamp>>;
602
603    /// Append `updates` into the local input named `id` and advance its upper to `upper`.
604    ///
605    /// The method returns a oneshot that can be awaited to indicate completion of the write.
606    /// The method may return an error, indicating an immediately visible error, and also the
607    /// oneshot may return an error if one is encountered during the write.
608    ///
609    /// All updates in `commands` are applied atomically.
610    // TODO(petrosagg): switch upper to `Antichain<Timestamp>`
611    fn append_table(
612        &mut self,
613        write_ts: Self::Timestamp,
614        advance_to: Self::Timestamp,
615        commands: Vec<(GlobalId, Vec<TableData>)>,
616    ) -> Result<
617        tokio::sync::oneshot::Receiver<Result<(), StorageError<Self::Timestamp>>>,
618        StorageError<Self::Timestamp>,
619    >;
620
621    /// Returns a [`MonotonicAppender`] which is a channel that can be used to monotonically
622    /// append to the specified [`GlobalId`].
623    fn monotonic_appender(
624        &self,
625        id: GlobalId,
626    ) -> Result<MonotonicAppender<Self::Timestamp>, StorageError<Self::Timestamp>>;
627
628    /// Returns a shared [`WebhookStatistics`] which can be used to report user-facing
629    /// statistics for this given webhhook, specified by the [`GlobalId`].
630    ///
631    // This is used to support a fairly special case, where a source needs to report statistics
632    // from outside the ordinary controller-clusterd path. Its possible to merge this with
633    // `monotonic_appender`, whose only current user is webhooks, but given that they will
634    // likely be moved to clusterd, we just leave this a special case.
635    fn webhook_statistics(
636        &self,
637        id: GlobalId,
638    ) -> Result<Arc<WebhookStatistics>, StorageError<Self::Timestamp>>;
639
640    /// Waits until the controller is ready to process a response.
641    ///
642    /// This method may block for an arbitrarily long time.
643    ///
644    /// When the method returns, the owner should call
645    /// [`StorageController::process`] to process the ready message.
646    ///
647    /// This method is cancellation safe.
648    async fn ready(&mut self);
649
650    /// Processes the work queued by [`StorageController::ready`].
651    fn process(
652        &mut self,
653        storage_metadata: &StorageMetadata,
654    ) -> Result<Option<Response<Self::Timestamp>>, anyhow::Error>;
655
656    /// Exposes the internal state of the data shard for debugging and QA.
657    ///
658    /// We'll be thoughtful about making unnecessary changes, but the **output
659    /// of this method needs to be gated from users**, so that it's not subject
660    /// to our backward compatibility guarantees.
661    ///
662    /// TODO: Ideally this would return `impl Serialize` so the caller can do
663    /// with it what they like, but that doesn't work in traits yet. The
664    /// workaround (an associated type) doesn't work because persist doesn't
665    /// want to make the type public. In the meantime, move the `serde_json`
666    /// call from the single user into this method.
667    async fn inspect_persist_state(&self, id: GlobalId)
668    -> Result<serde_json::Value, anyhow::Error>;
669
670    /// Records append-only updates for the given introspection type.
671    ///
672    /// Rows passed in `updates` MUST have the correct schema for the given
673    /// introspection type, as readers rely on this and might panic otherwise.
674    fn append_introspection_updates(&mut self, type_: IntrospectionType, updates: Vec<(Row, Diff)>);
675
676    /// Records append-only status updates for the given introspection type.
677    fn append_status_introspection_updates(
678        &mut self,
679        type_: IntrospectionType,
680        updates: Vec<StatusUpdate>,
681    );
682
683    /// Updates the desired state of the given introspection type.
684    ///
685    /// Rows passed in `op` MUST have the correct schema for the given
686    /// introspection type, as readers rely on this and might panic otherwise.
687    fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp);
688
689    /// Returns a sender for updates to the specified append-only introspection collection.
690    ///
691    /// # Panics
692    ///
693    /// Panics if the given introspection type is not associated with an append-only collection.
694    fn append_only_introspection_tx(
695        &self,
696        type_: IntrospectionType,
697    ) -> mpsc::UnboundedSender<(
698        Vec<AppendOnlyUpdate>,
699        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
700    )>;
701
702    /// Returns a sender for updates to the specified differential introspection collection.
703    ///
704    /// # Panics
705    ///
706    /// Panics if the given introspection type is not associated with a differential collection.
707    fn differential_introspection_tx(
708        &self,
709        type_: IntrospectionType,
710    ) -> mpsc::UnboundedSender<(
711        StorageWriteOp,
712        oneshot::Sender<Result<(), StorageError<Self::Timestamp>>>,
713    )>;
714
715    async fn real_time_recent_timestamp(
716        &self,
717        source_ids: BTreeSet<GlobalId>,
718        timeout: Duration,
719    ) -> Result<
720        BoxFuture<Result<Self::Timestamp, StorageError<Self::Timestamp>>>,
721        StorageError<Self::Timestamp>,
722    >;
723}
724
725impl<T> DataSource<T> {
726    /// Returns true if the storage controller manages the data shard for this
727    /// source using txn-wal.
728    pub fn in_txns(&self) -> bool {
729        match self {
730            DataSource::Table { .. } => true,
731            DataSource::Other
732            | DataSource::Ingestion(_)
733            | DataSource::IngestionExport { .. }
734            | DataSource::Introspection(_)
735            | DataSource::Progress
736            | DataSource::Webhook => false,
737            DataSource::Sink { .. } => false,
738        }
739    }
740}
741
742/// A wrapper struct that presents the adapter token to a format that is understandable by persist
743/// and also allows us to differentiate between a token being present versus being set for the
744/// first time.
745#[derive(PartialEq, Clone, Debug)]
746pub struct PersistEpoch(pub Option<NonZeroI64>);
747
748impl Opaque for PersistEpoch {
749    fn initial() -> Self {
750        PersistEpoch(None)
751    }
752}
753
754impl Codec64 for PersistEpoch {
755    fn codec_name() -> String {
756        "PersistEpoch".to_owned()
757    }
758
759    fn encode(&self) -> [u8; 8] {
760        self.0.map(NonZeroI64::get).unwrap_or(0).to_le_bytes()
761    }
762
763    fn decode(buf: [u8; 8]) -> Self {
764        Self(NonZeroI64::new(i64::from_le_bytes(buf)))
765    }
766}
767
768impl From<NonZeroI64> for PersistEpoch {
769    fn from(epoch: NonZeroI64) -> Self {
770        Self(Some(epoch))
771    }
772}
773
774/// State maintained about individual exports.
775#[derive(Debug)]
776pub struct ExportState<T: TimelyTimestamp> {
777    /// Really only for keeping track of changes to the `derived_since`.
778    pub read_capabilities: MutableAntichain<T>,
779
780    /// The cluster this export is associated with.
781    pub cluster_id: StorageInstanceId,
782
783    /// The current since frontier, derived from `write_frontier` using
784    /// `hold_policy`.
785    pub derived_since: Antichain<T>,
786
787    /// The read holds that this export has on its dependencies (its input and itself). When
788    /// the upper of the export changes, we downgrade this, which in turn
789    /// downgrades holds we have on our dependencies' sinces.
790    pub read_holds: [ReadHold<T>; 2],
791
792    /// The policy to use to downgrade `self.read_capability`.
793    pub read_policy: ReadPolicy<T>,
794
795    /// Reported write frontier.
796    pub write_frontier: Antichain<T>,
797}
798
799impl<T: Timestamp> ExportState<T> {
800    pub fn new(
801        cluster_id: StorageInstanceId,
802        read_hold: ReadHold<T>,
803        self_hold: ReadHold<T>,
804        write_frontier: Antichain<T>,
805        read_policy: ReadPolicy<T>,
806    ) -> Self
807    where
808        T: Lattice,
809    {
810        let mut dependency_since = Antichain::from_elem(T::minimum());
811        for read_hold in [&read_hold, &self_hold] {
812            dependency_since.join_assign(read_hold.since());
813        }
814        Self {
815            read_capabilities: MutableAntichain::from(dependency_since.borrow()),
816            cluster_id,
817            derived_since: dependency_since,
818            read_holds: [read_hold, self_hold],
819            read_policy,
820            write_frontier,
821        }
822    }
823
824    /// Returns the cluster to which the export is bound.
825    pub fn cluster_id(&self) -> StorageInstanceId {
826        self.cluster_id
827    }
828
829    /// Returns the cluster to which the export is bound.
830    pub fn input_hold(&self) -> &ReadHold<T> {
831        &self.read_holds[0]
832    }
833
834    /// Returns whether the export was dropped.
835    pub fn is_dropped(&self) -> bool {
836        self.read_holds.iter().all(|h| h.since().is_empty())
837    }
838}
839/// A channel that allows you to append a set of updates to a pre-defined [`GlobalId`].
840///
841/// See `CollectionManager::monotonic_appender` to acquire a [`MonotonicAppender`].
842#[derive(Clone, Debug)]
843pub struct MonotonicAppender<T> {
844    /// Channel that sends to a [`tokio::task`] which pushes updates to Persist.
845    tx: mpsc::UnboundedSender<(
846        Vec<AppendOnlyUpdate>,
847        oneshot::Sender<Result<(), StorageError<T>>>,
848    )>,
849}
850
851impl<T> MonotonicAppender<T> {
852    pub fn new(
853        tx: mpsc::UnboundedSender<(
854            Vec<AppendOnlyUpdate>,
855            oneshot::Sender<Result<(), StorageError<T>>>,
856        )>,
857    ) -> Self {
858        MonotonicAppender { tx }
859    }
860
861    pub async fn append(&self, updates: Vec<AppendOnlyUpdate>) -> Result<(), StorageError<T>> {
862        let (tx, rx) = oneshot::channel();
863
864        // Send our update to the CollectionManager.
865        self.tx
866            .send((updates, tx))
867            .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
868
869        // Wait for a response, if we fail to receive then the CollectionManager has gone away.
870        let result = rx
871            .await
872            .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
873
874        result
875    }
876}
877
878/// The period covered by a wallclock lag histogram, represented as a `[start, end)` range.
879#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
880pub struct WallclockLagHistogramPeriod {
881    pub start: CheckedTimestamp<DateTime<Utc>>,
882    pub end: CheckedTimestamp<DateTime<Utc>>,
883}
884
885impl WallclockLagHistogramPeriod {
886    /// Construct a `WallclockLagHistogramPeriod` from the given epoch timestamp and dyncfg.
887    pub fn from_epoch_millis(epoch_ms: u64, dyncfg: &ConfigSet) -> Self {
888        let interval = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.get(dyncfg);
889        let interval_ms = u64::try_from(interval.as_millis()).unwrap_or_else(|_| {
890            soft_panic_or_log!("excessive wallclock lag histogram period interval: {interval:?}");
891            let default = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.default();
892            u64::try_from(default.as_millis()).unwrap()
893        });
894        let interval_ms = std::cmp::max(interval_ms, 1);
895
896        let start_ms = epoch_ms - (epoch_ms % interval_ms);
897        let start_dt = mz_ore::now::to_datetime(start_ms);
898        let start = start_dt.try_into().expect("must fit");
899
900        let end_ms = start_ms + interval_ms;
901        let end_dt = mz_ore::now::to_datetime(end_ms);
902        let end = end_dt.try_into().expect("must fit");
903
904        Self { start, end }
905    }
906}
907
908#[cfg(test)]
909mod tests {
910    use super::*;
911
912    #[mz_ore::test]
913    fn lag_writes_by_zero() {
914        let policy =
915            ReadPolicy::lag_writes_by(mz_repr::Timestamp::default(), mz_repr::Timestamp::default());
916        let write_frontier = Antichain::from_elem(mz_repr::Timestamp::from(5));
917        assert_eq!(policy.frontier(write_frontier.borrow()), write_frontier);
918    }
919}