mz_storage_client/controller.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the storage layer.
11//!
12//! The storage controller curates the creation of sources, the progress of readers through these collections,
13//! and their eventual dropping and resource reclamation.
14//!
15//! The storage controller can be viewed as a partial map from `GlobalId` to collection. It is an error to
16//! use an identifier before it has been "created" with `create_source()`. Once created, the controller holds
17//! a read capability for each source, which is manipulated with `update_read_capabilities()`.
18//! Eventually, the source is dropped with either `drop_sources()` or by allowing compaction to the
19//! empty frontier.
20
21use std::collections::{BTreeMap, BTreeSet};
22use std::fmt::Debug;
23use std::future::Future;
24use std::num::NonZeroI64;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::time::Duration;
28
29use async_trait::async_trait;
30use chrono::{DateTime, Utc};
31use differential_dataflow::lattice::Lattice;
32use mz_cluster_client::ReplicaId;
33use mz_cluster_client::client::ClusterReplicaLocation;
34use mz_controller_types::dyncfgs::WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL;
35use mz_dyncfg::ConfigSet;
36use mz_ore::soft_panic_or_log;
37use mz_persist_client::batch::ProtoBatch;
38use mz_persist_types::{Codec64, ShardId};
39use mz_repr::adt::interval::Interval;
40use mz_repr::adt::timestamp::CheckedTimestamp;
41use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, Timestamp};
42use mz_storage_types::configuration::StorageConfiguration;
43use mz_storage_types::connections::inline::InlinedConnection;
44use mz_storage_types::controller::{CollectionMetadata, StorageError};
45use mz_storage_types::errors::CollectionMissing;
46use mz_storage_types::instances::StorageInstanceId;
47use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
48use mz_storage_types::parameters::StorageParameters;
49use mz_storage_types::read_holds::ReadHold;
50use mz_storage_types::read_policy::ReadPolicy;
51use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
52use mz_storage_types::sources::{
53 GenericSourceConnection, IngestionDescription, SourceDesc, SourceExportDataConfig,
54 SourceExportDetails, Timeline,
55};
56use serde::{Deserialize, Serialize};
57use timely::progress::Antichain;
58use timely::progress::frontier::MutableAntichain;
59use tokio::sync::{mpsc, oneshot};
60
61use crate::client::{AppendOnlyUpdate, StatusUpdate, TableData};
62use crate::statistics::WebhookStatistics;
63
64#[derive(
65 Clone,
66 Copy,
67 Debug,
68 Serialize,
69 Deserialize,
70 Eq,
71 PartialEq,
72 Hash,
73 PartialOrd,
74 Ord
75)]
76pub enum IntrospectionType {
77 /// We're not responsible for appending to this collection automatically, but we should
78 /// automatically bump the write frontier from time to time.
79 SinkStatusHistory,
80 SourceStatusHistory,
81 ShardMapping,
82
83 Frontiers,
84 ReplicaFrontiers,
85
86 ReplicaStatusHistory,
87 ReplicaMetricsHistory,
88 WallclockLagHistory,
89 WallclockLagHistogram,
90
91 // Note that this single-shard introspection source will be changed to per-replica,
92 // once we allow multiplexing multiple sources/sinks on a single cluster.
93 StorageSourceStatistics,
94 StorageSinkStatistics,
95
96 // The below are for statement logging.
97 StatementExecutionHistory,
98 SessionHistory,
99 PreparedStatementHistory,
100 SqlText,
101 // For statement lifecycle logging, which is closely related
102 // to statement logging
103 StatementLifecycleHistory,
104
105 // Collections written by the compute controller.
106 ComputeDependencies,
107 ComputeOperatorHydrationStatus,
108 ComputeMaterializedViewRefreshes,
109 ComputeErrorCounts,
110 ComputeHydrationTimes,
111 ComputeObjectArrangementSizes,
112
113 // Written by the Adapter for tracking AWS PrivateLink Connection Status History
114 PrivatelinkConnectionStatusHistory,
115}
116
117/// Describes how data is written to the collection.
118#[derive(Clone, Debug, Eq, PartialEq)]
119pub enum DataSource {
120 /// Ingest data from some external source.
121 Ingestion(IngestionDescription),
122 /// This source receives its data from the identified ingestion,
123 /// from an external object identified using `SourceExportDetails`.
124 ///
125 /// The referenced ingestion must be created before all of its exports.
126 IngestionExport {
127 ingestion_id: GlobalId,
128 details: SourceExportDetails,
129 data_config: SourceExportDataConfig,
130 },
131 /// Data comes from introspection sources, which the controller itself is
132 /// responsible for generating.
133 Introspection(IntrospectionType),
134 /// Data comes from the source's remapping/reclock operator.
135 Progress,
136 /// Data comes from external HTTP requests pushed to Materialize.
137 Webhook,
138 /// The adapter layer appends timestamped data, i.e. it is a `TABLE`.
139 Table,
140 /// This source's data does not need to be managed by the storage
141 /// controller, e.g. it's a materialized view or the catalog collection.
142 Other,
143 /// This collection is the output collection of a sink.
144 Sink { desc: ExportDescription },
145}
146
147/// Describes a request to create a source.
148#[derive(Clone, Debug, Eq, PartialEq)]
149pub struct CollectionDescription {
150 /// The schema of this collection
151 pub desc: RelationDesc,
152 /// The source of this collection's data.
153 pub data_source: DataSource,
154 /// An optional frontier to which the collection's `since` should be advanced.
155 pub since: Option<Antichain<Timestamp>>,
156 /// The timeline of the source. Absent for materialized views, etc.
157 pub timeline: Option<Timeline>,
158 /// The primary of this collections.
159 ///
160 /// Multiple storage collections can point to the same persist shard,
161 /// possibly with different schemas. In such a configuration, we select one
162 /// of the involved collections as the primary, who "owns" the persist
163 /// shard. All other involved collections have a dependency on the primary.
164 pub primary: Option<GlobalId>,
165}
166
167impl CollectionDescription {
168 /// Create a CollectionDescription for [`DataSource::Other`].
169 pub fn for_other(desc: RelationDesc, since: Option<Antichain<Timestamp>>) -> Self {
170 Self {
171 desc,
172 data_source: DataSource::Other,
173 since,
174 timeline: None,
175 primary: None,
176 }
177 }
178
179 /// Create a CollectionDescription for a table.
180 pub fn for_table(desc: RelationDesc) -> Self {
181 Self {
182 desc,
183 data_source: DataSource::Table,
184 since: None,
185 timeline: Some(Timeline::EpochMilliseconds),
186 primary: None,
187 }
188 }
189}
190
191#[derive(Clone, Debug, Eq, PartialEq)]
192pub struct ExportDescription<T = mz_repr::Timestamp> {
193 pub sink: StorageSinkDesc<(), T>,
194 /// The ID of the instance in which to install the export.
195 pub instance_id: StorageInstanceId,
196}
197
198#[derive(Debug)]
199pub enum Response {
200 FrontierUpdates(Vec<(GlobalId, Antichain<Timestamp>)>),
201}
202
203/// Metadata that the storage controller must know to properly handle the life
204/// cycle of creating and dropping collections.
205///
206/// This data should be kept consistent with the state modified using
207/// [`StorageTxn`].
208///
209/// n.b. the "txn WAL shard" is also metadata that's persisted, but if we
210/// included it in this struct it would never be read.
211#[derive(Debug, Clone, Serialize, Default)]
212pub struct StorageMetadata {
213 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
214 pub collection_metadata: BTreeMap<GlobalId, ShardId>,
215 pub unfinalized_shards: BTreeSet<ShardId>,
216}
217
218impl StorageMetadata {
219 pub fn get_collection_shard(&self, id: GlobalId) -> Result<ShardId, StorageError> {
220 let shard_id = self
221 .collection_metadata
222 .get(&id)
223 .ok_or(StorageError::IdentifierMissing(id))?;
224
225 Ok(*shard_id)
226 }
227}
228
229/// Provides an interface for the storage controller to read and write data that
230/// is recorded elsewhere.
231///
232/// Data written to the implementor of this trait should make a consistent view
233/// of the data available through [`StorageMetadata`].
234#[async_trait]
235pub trait StorageTxn {
236 /// Retrieve all of the visible storage metadata.
237 ///
238 /// The value of this map should be treated as opaque.
239 fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId>;
240
241 /// Add new storage metadata for a collection.
242 ///
243 /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must include
244 /// this data.
245 fn insert_collection_metadata(
246 &mut self,
247 s: BTreeMap<GlobalId, ShardId>,
248 ) -> Result<(), StorageError>;
249
250 /// Remove the metadata associated with the identified collections.
251 ///
252 /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must not
253 /// include these keys.
254 fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)>;
255
256 /// Retrieve all of the shards that are no longer in use by an active
257 /// collection but are yet to be finalized.
258 fn get_unfinalized_shards(&self) -> BTreeSet<ShardId>;
259
260 /// Insert the specified values as unfinalized shards.
261 fn insert_unfinalized_shards(&mut self, s: BTreeSet<ShardId>) -> Result<(), StorageError>;
262
263 /// Mark the specified shards as finalized, deleting them from the
264 /// unfinalized shard collection.
265 fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>);
266
267 /// Get the txn WAL shard for this environment if it exists.
268 fn get_txn_wal_shard(&self) -> Option<ShardId>;
269
270 /// Store the specified shard as the environment's txn WAL shard.
271 ///
272 /// The implementor should error if the shard is already specified.
273 fn write_txn_wal_shard(&mut self, shard: ShardId) -> Result<(), StorageError>;
274}
275
276pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
277
278/// A predicate for a `Row` filter.
279pub type RowPredicate = Box<dyn Fn(&Row) -> bool + Send + Sync>;
280
281/// High-level write operations applicable to storage collections.
282pub enum StorageWriteOp {
283 /// Append a set of rows with specified multiplicities.
284 ///
285 /// The multiplicities may be negative, so an `Append` operation can perform
286 /// both insertions and retractions.
287 Append { updates: Vec<(Row, Diff)> },
288 /// Delete all rows matching the given predicate.
289 Delete { filter: RowPredicate },
290}
291
292impl StorageWriteOp {
293 /// Returns whether this operation appends an empty set of updates.
294 pub fn is_empty_append(&self) -> bool {
295 match self {
296 Self::Append { updates } => updates.is_empty(),
297 Self::Delete { .. } => false,
298 }
299 }
300}
301
302#[async_trait(?Send)]
303pub trait StorageController: Debug {
304 /// Marks the end of any initialization commands.
305 ///
306 /// The implementor may wait for this method to be called before implementing prior commands,
307 /// and so it is important for a user to invoke this method as soon as it is comfortable.
308 /// This method can be invoked immediately, at the potential expense of performance.
309 fn initialization_complete(&mut self);
310
311 /// Update storage configuration with new parameters.
312 fn update_parameters(&mut self, config_params: StorageParameters);
313
314 /// Get the current configuration, including parameters updated with `update_parameters`.
315 fn config(&self) -> &StorageConfiguration;
316
317 /// Returns the [CollectionMetadata] of the collection identified by `id`.
318 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
319
320 /// Returns `true` iff the given collection/ingestion has been hydrated.
321 ///
322 /// For this check, zero-replica clusters are always considered hydrated.
323 /// Their collections would never normally be considered hydrated but it's
324 /// clearly intentional that they have no replicas.
325 fn collection_hydrated(&self, collection_id: GlobalId) -> Result<bool, StorageError>;
326
327 /// Returns `true` if each non-transient, non-excluded collection is
328 /// hydrated on at least one of the provided replicas.
329 ///
330 /// If no replicas are provided, this checks for hydration on _any_ replica.
331 ///
332 /// This also returns `true` in case this cluster does not have any
333 /// replicas.
334 fn collections_hydrated_on_replicas(
335 &self,
336 target_replica_ids: Option<Vec<ReplicaId>>,
337 target_cluster_ids: &StorageInstanceId,
338 exclude_collections: &BTreeSet<GlobalId>,
339 ) -> Result<bool, StorageError>;
340
341 /// Returns the since/upper frontiers of the identified collection.
342 fn collection_frontiers(
343 &self,
344 id: GlobalId,
345 ) -> Result<(Antichain<Timestamp>, Antichain<Timestamp>), CollectionMissing>;
346
347 /// Returns the since/upper frontiers of the identified collections.
348 ///
349 /// Having a method that returns both frontiers at the same time, for all
350 /// requested collections, ensures that we can get a consistent "snapshot"
351 /// of collection state. If we had separate methods instead, and/or would
352 /// allow getting frontiers for collections one at a time, it could happen
353 /// that collection state changes concurrently, while information is
354 /// gathered.
355 fn collections_frontiers(
356 &self,
357 id: Vec<GlobalId>,
358 ) -> Result<Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>, CollectionMissing>;
359
360 /// Acquire an iterator over [CollectionMetadata] for all active
361 /// collections.
362 ///
363 /// A collection is "active" when it has a non-empty frontier of read
364 /// capabilities.
365 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
366
367 /// Returns the IDs of ingestion exports running on the given instance. This
368 /// includes the ingestion itself, if any, and running source tables (aka.
369 /// subsources).
370 fn active_ingestion_exports(
371 &self,
372 instance_id: StorageInstanceId,
373 ) -> Box<dyn Iterator<Item = &GlobalId> + '_>;
374
375 /// Checks whether a collection exists under the given `GlobalId`. Returns
376 /// an error if the collection does not exist.
377 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError>;
378
379 /// Creates a storage instance with the specified ID.
380 ///
381 /// A storage instance can have zero or one replicas. The instance is
382 /// created with zero replicas.
383 ///
384 /// Panics if a storage instance with the given ID already exists.
385 fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>);
386
387 /// Drops the storage instance with the given ID.
388 ///
389 /// If you call this method while the storage instance has a replica
390 /// attached, that replica will be leaked. Call `drop_replica` first.
391 ///
392 /// Panics if a storage instance with the given ID does not exist.
393 fn drop_instance(&mut self, id: StorageInstanceId);
394
395 /// Updates a storage instance's workload class.
396 fn update_instance_workload_class(
397 &mut self,
398 id: StorageInstanceId,
399 workload_class: Option<String>,
400 );
401
402 /// Connects the storage instance to the specified replica.
403 ///
404 /// If the storage instance is already attached to a replica, communication
405 /// with that replica is severed in favor of the new replica.
406 ///
407 /// In the future, this API will be adjusted to support active replication
408 /// of storage instances (i.e., multiple replicas attached to a given
409 /// storage instance).
410 fn connect_replica(
411 &mut self,
412 instance_id: StorageInstanceId,
413 replica_id: ReplicaId,
414 location: ClusterReplicaLocation,
415 );
416
417 /// Disconnects the storage instance from the specified replica.
418 fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId);
419
420 /// Across versions of Materialize the nullability of columns for some objects can change based
421 /// on updates to our optimizer.
422 ///
423 /// During bootstrap we will register these new schemas with Persist.
424 ///
425 /// See: <https://github.com/MaterializeInc/database-issues/issues/2488>
426 async fn evolve_nullability_for_bootstrap(
427 &mut self,
428 storage_metadata: &StorageMetadata,
429 collections: Vec<(GlobalId, RelationDesc)>,
430 ) -> Result<(), StorageError>;
431
432 /// Create the sources described in the individual RunIngestionCommand commands.
433 ///
434 /// Each command carries the source id, the source description, and any associated metadata
435 /// needed to ingest the particular source.
436 ///
437 /// This command installs collection state for the indicated sources, and they are
438 /// now valid to use in queries at times beyond the initial `since` frontiers. Each
439 /// collection also acquires a read capability at this frontier, which will need to
440 /// be repeatedly downgraded with `allow_compaction()` to permit compaction.
441 ///
442 /// This method is NOT idempotent; It can fail between processing of different
443 /// collections and leave the controller in an inconsistent state. It is almost
444 /// always wrong to do anything but abort the process on `Err`.
445 ///
446 /// The `register_ts` is used as the initial timestamp that tables are available for reads. (We
447 /// might later give non-tables the same treatment, but hold off on that initially.) Callers
448 /// must provide a Some if any of the collections is a table. A None may be given if none of the
449 /// collections are a table (i.e. all materialized views, sources, etc).
450 async fn create_collections(
451 &mut self,
452 storage_metadata: &StorageMetadata,
453 register_ts: Option<Timestamp>,
454 collections: Vec<(GlobalId, CollectionDescription)>,
455 ) -> Result<(), StorageError> {
456 self.create_collections_for_bootstrap(
457 storage_metadata,
458 register_ts,
459 collections,
460 &BTreeSet::new(),
461 )
462 .await
463 }
464
465 /// Like [`Self::create_collections`], except used specifically for bootstrap.
466 ///
467 /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
468 /// from the txn-wal sub-system.
469 async fn create_collections_for_bootstrap(
470 &mut self,
471 storage_metadata: &StorageMetadata,
472 register_ts: Option<Timestamp>,
473 collections: Vec<(GlobalId, CollectionDescription)>,
474 migrated_storage_collections: &BTreeSet<GlobalId>,
475 ) -> Result<(), StorageError>;
476
477 /// Check that the ingestion associated with `id` can use the provided
478 /// [`SourceDesc`].
479 ///
480 /// Note that this check is optimistic and its return of `Ok(())` does not
481 /// guarantee that subsequent calls to `alter_ingestion_source_desc` are
482 /// guaranteed to succeed.
483 fn check_alter_ingestion_source_desc(
484 &mut self,
485 ingestion_id: GlobalId,
486 source_desc: &SourceDesc,
487 ) -> Result<(), StorageError>;
488
489 /// Alters each identified ingestion to use the correlated [`SourceDesc`].
490 async fn alter_ingestion_source_desc(
491 &mut self,
492 ingestion_ids: BTreeMap<GlobalId, SourceDesc>,
493 ) -> Result<(), StorageError>;
494
495 /// Alters each identified collection to use the correlated [`GenericSourceConnection`].
496 async fn alter_ingestion_connections(
497 &mut self,
498 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
499 ) -> Result<(), StorageError>;
500
501 /// Alters the data config for the specified source exports of the specified ingestions.
502 async fn alter_ingestion_export_data_configs(
503 &mut self,
504 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
505 ) -> Result<(), StorageError>;
506
507 async fn alter_table_desc(
508 &mut self,
509 existing_collection: GlobalId,
510 new_collection: GlobalId,
511 new_desc: RelationDesc,
512 expected_version: RelationVersion,
513 register_ts: Timestamp,
514 ) -> Result<(), StorageError>;
515
516 /// Acquire an immutable reference to the export state, should it exist.
517 fn export(&self, id: GlobalId) -> Result<&ExportState, StorageError>;
518
519 /// Acquire a mutable reference to the export state, should it exist.
520 fn export_mut(&mut self, id: GlobalId) -> Result<&mut ExportState, StorageError>;
521
522 /// Create a oneshot ingestion.
523 async fn create_oneshot_ingestion(
524 &mut self,
525 ingestion_id: uuid::Uuid,
526 collection_id: GlobalId,
527 instance_id: StorageInstanceId,
528 request: OneshotIngestionRequest,
529 result_tx: OneshotResultCallback<ProtoBatch>,
530 ) -> Result<(), StorageError>;
531
532 /// Cancel a oneshot ingestion.
533 fn cancel_oneshot_ingestion(&mut self, ingestion_id: uuid::Uuid) -> Result<(), StorageError>;
534
535 /// Alter the sink identified by the given id to match the provided `ExportDescription`.
536 async fn alter_export(
537 &mut self,
538 id: GlobalId,
539 export: ExportDescription,
540 ) -> Result<(), StorageError>;
541
542 /// For each identified export, alter its [`StorageSinkConnection`].
543 async fn alter_export_connections(
544 &mut self,
545 exports: BTreeMap<GlobalId, StorageSinkConnection>,
546 ) -> Result<(), StorageError>;
547
548 /// Drops the read capability for the tables and allows their resources to be reclaimed.
549 fn drop_tables(
550 &mut self,
551 storage_metadata: &StorageMetadata,
552 identifiers: Vec<GlobalId>,
553 ts: Timestamp,
554 ) -> Result<(), StorageError>;
555
556 /// Drops the read capability for the sources and allows their resources to be reclaimed.
557 fn drop_sources(
558 &mut self,
559 storage_metadata: &StorageMetadata,
560 identifiers: Vec<GlobalId>,
561 ) -> Result<(), StorageError>;
562
563 /// Drops the read capability for the sinks and allows their resources to be reclaimed.
564 fn drop_sinks(
565 &mut self,
566 storage_metadata: &StorageMetadata,
567 identifiers: Vec<GlobalId>,
568 ) -> Result<(), StorageError>;
569
570 /// Drops the read capability for the sinks and allows their resources to be reclaimed.
571 ///
572 /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
573 /// controller starts/restarts it has no durable state. That means that it has no way of
574 /// remembering any past commands sent. In the future we plan on persisting state for the
575 /// controller so that it is aware of past commands.
576 /// Therefore this method is for dropping sinks that we know to have been previously
577 /// created, but have been forgotten by the controller due to a restart.
578 /// Once command history becomes durable we can remove this method and use the normal
579 /// `drop_sinks`.
580 fn drop_sinks_unvalidated(
581 &mut self,
582 storage_metadata: &StorageMetadata,
583 identifiers: Vec<GlobalId>,
584 );
585
586 /// Drops the read capability for the sources and allows their resources to be reclaimed.
587 ///
588 /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
589 /// controller starts/restarts it has no durable state. That means that it has no way of
590 /// remembering any past commands sent. In the future we plan on persisting state for the
591 /// controller so that it is aware of past commands.
592 /// Therefore this method is for dropping sources that we know to have been previously
593 /// created, but have been forgotten by the controller due to a restart.
594 /// Once command history becomes durable we can remove this method and use the normal
595 /// `drop_sources`.
596 fn drop_sources_unvalidated(
597 &mut self,
598 storage_metadata: &StorageMetadata,
599 identifiers: Vec<GlobalId>,
600 ) -> Result<(), StorageError>;
601
602 /// Append `updates` into the local input named `id` and advance its upper to `upper`.
603 ///
604 /// The method returns a oneshot that can be awaited to indicate completion of the write.
605 /// The method may return an error, indicating an immediately visible error, and also the
606 /// oneshot may return an error if one is encountered during the write.
607 ///
608 /// All updates in `commands` are applied atomically.
609 // TODO(petrosagg): switch upper to `Antichain<Timestamp>`
610 fn append_table(
611 &mut self,
612 write_ts: Timestamp,
613 advance_to: Timestamp,
614 commands: Vec<(GlobalId, Vec<TableData>)>,
615 ) -> Result<tokio::sync::oneshot::Receiver<Result<(), StorageError>>, StorageError>;
616
617 /// Returns a [`MonotonicAppender`] which is a channel that can be used to monotonically
618 /// append to the specified [`GlobalId`].
619 fn monotonic_appender(&self, id: GlobalId) -> Result<MonotonicAppender, StorageError>;
620
621 /// Returns a shared [`WebhookStatistics`] which can be used to report user-facing
622 /// statistics for this given webhhook, specified by the [`GlobalId`].
623 ///
624 // This is used to support a fairly special case, where a source needs to report statistics
625 // from outside the ordinary controller-clusterd path. Its possible to merge this with
626 // `monotonic_appender`, whose only current user is webhooks, but given that they will
627 // likely be moved to clusterd, we just leave this a special case.
628 fn webhook_statistics(&self, id: GlobalId) -> Result<Arc<WebhookStatistics>, StorageError>;
629
630 /// Waits until the controller is ready to process a response.
631 ///
632 /// This method may block for an arbitrarily long time.
633 ///
634 /// When the method returns, the owner should call
635 /// [`StorageController::process`] to process the ready message.
636 ///
637 /// This method is cancellation safe.
638 async fn ready(&mut self);
639
640 /// Processes the work queued by [`StorageController::ready`].
641 fn process(
642 &mut self,
643 storage_metadata: &StorageMetadata,
644 ) -> Result<Option<Response>, anyhow::Error>;
645
646 /// Exposes the internal state of the data shard for debugging and QA.
647 ///
648 /// We'll be thoughtful about making unnecessary changes, but the **output
649 /// of this method needs to be gated from users**, so that it's not subject
650 /// to our backward compatibility guarantees.
651 ///
652 /// TODO: Ideally this would return `impl Serialize` so the caller can do
653 /// with it what they like, but that doesn't work in traits yet. The
654 /// workaround (an associated type) doesn't work because persist doesn't
655 /// want to make the type public. In the meantime, move the `serde_json`
656 /// call from the single user into this method.
657 async fn inspect_persist_state(&self, id: GlobalId)
658 -> Result<serde_json::Value, anyhow::Error>;
659
660 /// Records append-only updates for the given introspection type.
661 ///
662 /// Rows passed in `updates` MUST have the correct schema for the given
663 /// introspection type, as readers rely on this and might panic otherwise.
664 fn append_introspection_updates(&mut self, type_: IntrospectionType, updates: Vec<(Row, Diff)>);
665
666 /// Records append-only status updates for the given introspection type.
667 fn append_status_introspection_updates(
668 &mut self,
669 type_: IntrospectionType,
670 updates: Vec<StatusUpdate>,
671 );
672
673 /// Updates the desired state of the given introspection type.
674 ///
675 /// Rows passed in `op` MUST have the correct schema for the given
676 /// introspection type, as readers rely on this and might panic otherwise.
677 fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp);
678
679 /// Returns a sender for updates to the specified append-only introspection collection.
680 ///
681 /// # Panics
682 ///
683 /// Panics if the given introspection type is not associated with an append-only collection.
684 fn append_only_introspection_tx(
685 &self,
686 type_: IntrospectionType,
687 ) -> mpsc::UnboundedSender<(
688 Vec<AppendOnlyUpdate>,
689 oneshot::Sender<Result<(), StorageError>>,
690 )>;
691
692 /// Returns a sender for updates to the specified differential introspection collection.
693 ///
694 /// # Panics
695 ///
696 /// Panics if the given introspection type is not associated with a differential collection.
697 fn differential_introspection_tx(
698 &self,
699 type_: IntrospectionType,
700 ) -> mpsc::UnboundedSender<(StorageWriteOp, oneshot::Sender<Result<(), StorageError>>)>;
701
702 async fn real_time_recent_timestamp(
703 &self,
704 source_ids: BTreeSet<GlobalId>,
705 timeout: Duration,
706 ) -> Result<BoxFuture<Result<Timestamp, StorageError>>, StorageError>;
707
708 /// Returns the state of the [`StorageController`] formatted as JSON.
709 fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
710}
711
712impl DataSource {
713 /// Returns true if the storage controller manages the data shard for this
714 /// source using txn-wal.
715 pub fn in_txns(&self) -> bool {
716 match self {
717 DataSource::Table => true,
718 DataSource::Other
719 | DataSource::Ingestion(_)
720 | DataSource::IngestionExport { .. }
721 | DataSource::Introspection(_)
722 | DataSource::Progress
723 | DataSource::Webhook => false,
724 DataSource::Sink { .. } => false,
725 }
726 }
727}
728
729/// A wrapper struct that presents the adapter token to a format that is understandable by persist
730/// and also allows us to differentiate between a token being present versus being set for the
731/// first time.
732#[derive(PartialEq, Clone, Debug, Default)]
733pub struct PersistEpoch(pub Option<NonZeroI64>);
734
735impl Codec64 for PersistEpoch {
736 fn codec_name() -> String {
737 "PersistEpoch".to_owned()
738 }
739
740 fn encode(&self) -> [u8; 8] {
741 self.0.map(NonZeroI64::get).unwrap_or(0).to_le_bytes()
742 }
743
744 fn decode(buf: [u8; 8]) -> Self {
745 Self(NonZeroI64::new(i64::from_le_bytes(buf)))
746 }
747}
748
749impl From<NonZeroI64> for PersistEpoch {
750 fn from(epoch: NonZeroI64) -> Self {
751 Self(Some(epoch))
752 }
753}
754
755/// State maintained about individual exports.
756#[derive(Debug)]
757pub struct ExportState {
758 /// Really only for keeping track of changes to the `derived_since`.
759 pub read_capabilities: MutableAntichain<Timestamp>,
760
761 /// The cluster this export is associated with.
762 pub cluster_id: StorageInstanceId,
763
764 /// The current since frontier, derived from `write_frontier` using
765 /// `hold_policy`.
766 pub derived_since: Antichain<Timestamp>,
767
768 /// The read holds that this export has on its dependencies (its input and itself). When
769 /// the upper of the export changes, we downgrade this, which in turn
770 /// downgrades holds we have on our dependencies' sinces.
771 pub read_holds: [ReadHold; 2],
772
773 /// The policy to use to downgrade `self.read_capability`.
774 pub read_policy: ReadPolicy,
775
776 /// Reported write frontier.
777 pub write_frontier: Antichain<Timestamp>,
778}
779
780impl ExportState {
781 pub fn new(
782 cluster_id: StorageInstanceId,
783 read_hold: ReadHold,
784 self_hold: ReadHold,
785 write_frontier: Antichain<Timestamp>,
786 read_policy: ReadPolicy,
787 ) -> Self {
788 let mut dependency_since = Antichain::from_elem(Timestamp::MIN);
789 for read_hold in [&read_hold, &self_hold] {
790 dependency_since.join_assign(read_hold.since());
791 }
792 Self {
793 read_capabilities: MutableAntichain::from(dependency_since.borrow()),
794 cluster_id,
795 derived_since: dependency_since,
796 read_holds: [read_hold, self_hold],
797 read_policy,
798 write_frontier,
799 }
800 }
801
802 /// Returns the cluster to which the export is bound.
803 pub fn cluster_id(&self) -> StorageInstanceId {
804 self.cluster_id
805 }
806
807 /// Returns the cluster to which the export is bound.
808 pub fn input_hold(&self) -> &ReadHold {
809 &self.read_holds[0]
810 }
811
812 /// Returns whether the export was dropped.
813 pub fn is_dropped(&self) -> bool {
814 self.read_holds.iter().all(|h| h.since().is_empty())
815 }
816}
817/// A channel that allows you to append a set of updates to a pre-defined [`GlobalId`].
818///
819/// See `CollectionManager::monotonic_appender` to acquire a [`MonotonicAppender`].
820#[derive(Clone, Debug)]
821pub struct MonotonicAppender {
822 /// Channel that sends to a [`tokio::task`] which pushes updates to Persist.
823 tx: mpsc::UnboundedSender<(
824 Vec<AppendOnlyUpdate>,
825 oneshot::Sender<Result<(), StorageError>>,
826 )>,
827}
828
829impl MonotonicAppender {
830 pub fn new(
831 tx: mpsc::UnboundedSender<(
832 Vec<AppendOnlyUpdate>,
833 oneshot::Sender<Result<(), StorageError>>,
834 )>,
835 ) -> Self {
836 MonotonicAppender { tx }
837 }
838
839 pub async fn append(&self, updates: Vec<AppendOnlyUpdate>) -> Result<(), StorageError> {
840 let (tx, rx) = oneshot::channel();
841
842 // Send our update to the CollectionManager.
843 self.tx
844 .send((updates, tx))
845 .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
846
847 // Wait for a response, if we fail to receive then the CollectionManager has gone away.
848 let result = rx
849 .await
850 .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
851
852 result
853 }
854}
855
856/// A wallclock lag measurement.
857///
858/// The enum representation reflects the fact that wallclock lag is undefined for unreadable
859/// collections, i.e. collections that contain no readable times.
860#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
861pub enum WallclockLag {
862 /// Lag value in seconds, for readable collections.
863 Seconds(u64),
864 /// Undefined lag, for unreadable collections.
865 Undefined,
866}
867
868impl WallclockLag {
869 /// The smallest possible wallclock lag measurement.
870 pub const MIN: Self = Self::Seconds(0);
871
872 /// Return the maximum of two lag values.
873 ///
874 /// We treat `Undefined` lags as greater than `Seconds`, to ensure we never report low lag
875 /// values when a collection was actually unreadable for some amount of time.
876 pub fn max(self, other: Self) -> Self {
877 match (self, other) {
878 (Self::Seconds(a), Self::Seconds(b)) => Self::Seconds(a.max(b)),
879 (Self::Undefined, _) | (_, Self::Undefined) => Self::Undefined,
880 }
881 }
882
883 /// Return the wrapped seconds value, or a default if the lag is `Undefined`.
884 pub fn unwrap_seconds_or(self, default: u64) -> u64 {
885 match self {
886 Self::Seconds(s) => s,
887 Self::Undefined => default,
888 }
889 }
890
891 /// Create a new `WallclockLag` by transforming the wrapped seconds value.
892 pub fn map_seconds(self, f: impl FnOnce(u64) -> u64) -> Self {
893 match self {
894 Self::Seconds(s) => Self::Seconds(f(s)),
895 Self::Undefined => Self::Undefined,
896 }
897 }
898
899 /// Convert this lag value into a [`Datum::Interval`] or [`Datum::Null`].
900 pub fn into_interval_datum(self) -> Datum<'static> {
901 match self {
902 Self::Seconds(secs) => {
903 let micros = i64::try_from(secs * 1_000_000).expect("must fit");
904 Datum::Interval(Interval::new(0, 0, micros))
905 }
906 Self::Undefined => Datum::Null,
907 }
908 }
909
910 /// Convert this lag value into a [`Datum::UInt64`] or [`Datum::Null`].
911 pub fn into_uint64_datum(self) -> Datum<'static> {
912 match self {
913 Self::Seconds(secs) => Datum::UInt64(secs),
914 Self::Undefined => Datum::Null,
915 }
916 }
917}
918
919/// The period covered by a wallclock lag histogram, represented as a `[start, end)` range.
920#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
921pub struct WallclockLagHistogramPeriod {
922 pub start: CheckedTimestamp<DateTime<Utc>>,
923 pub end: CheckedTimestamp<DateTime<Utc>>,
924}
925
926impl WallclockLagHistogramPeriod {
927 /// Construct a `WallclockLagHistogramPeriod` from the given epoch timestamp and dyncfg.
928 pub fn from_epoch_millis(epoch_ms: u64, dyncfg: &ConfigSet) -> Self {
929 let interval = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.get(dyncfg);
930 let interval_ms = u64::try_from(interval.as_millis()).unwrap_or_else(|_| {
931 soft_panic_or_log!("excessive wallclock lag histogram period interval: {interval:?}");
932 let default = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.default();
933 u64::try_from(default.as_millis()).unwrap()
934 });
935 let interval_ms = std::cmp::max(interval_ms, 1);
936
937 let start_ms = epoch_ms - (epoch_ms % interval_ms);
938 let start_dt = mz_ore::now::to_datetime(start_ms);
939 let start = start_dt.try_into().expect("must fit");
940
941 let end_ms = start_ms + interval_ms;
942 let end_dt = mz_ore::now::to_datetime(end_ms);
943 let end = end_dt.try_into().expect("must fit");
944
945 Self { start, end }
946 }
947}
948
949#[cfg(test)]
950mod tests {
951 use super::*;
952
953 #[mz_ore::test]
954 fn lag_writes_by_zero() {
955 let policy =
956 ReadPolicy::lag_writes_by(mz_repr::Timestamp::default(), mz_repr::Timestamp::default());
957 let write_frontier = Antichain::from_elem(mz_repr::Timestamp::from(5));
958 assert_eq!(policy.frontier(write_frontier.borrow()), write_frontier);
959 }
960}