mz_storage_client/controller.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A controller that provides an interface to the storage layer.
11//!
12//! The storage controller curates the creation of sources, the progress of readers through these collections,
13//! and their eventual dropping and resource reclamation.
14//!
15//! The storage controller can be viewed as a partial map from `GlobalId` to collection. It is an error to
16//! use an identifier before it has been "created" with `create_source()`. Once created, the controller holds
17//! a read capability for each source, which is manipulated with `update_read_capabilities()`.
18//! Eventually, the source is dropped with either `drop_sources()` or by allowing compaction to the
19//! empty frontier.
20
21use std::collections::{BTreeMap, BTreeSet};
22use std::fmt::Debug;
23use std::future::Future;
24use std::num::NonZeroI64;
25use std::pin::Pin;
26use std::sync::Arc;
27use std::time::Duration;
28
29use async_trait::async_trait;
30use chrono::{DateTime, Utc};
31use differential_dataflow::lattice::Lattice;
32use mz_cluster_client::ReplicaId;
33use mz_cluster_client::client::ClusterReplicaLocation;
34use mz_controller_types::dyncfgs::WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL;
35use mz_dyncfg::ConfigSet;
36use mz_ore::soft_panic_or_log;
37use mz_persist_client::batch::ProtoBatch;
38use mz_persist_types::{Codec64, ShardId};
39use mz_repr::adt::interval::Interval;
40use mz_repr::adt::timestamp::CheckedTimestamp;
41use mz_repr::{Datum, Diff, GlobalId, RelationDesc, RelationVersion, Row, Timestamp};
42use mz_storage_types::configuration::StorageConfiguration;
43use mz_storage_types::connections::inline::InlinedConnection;
44use mz_storage_types::controller::{CollectionMetadata, StorageError};
45use mz_storage_types::errors::CollectionMissing;
46use mz_storage_types::instances::StorageInstanceId;
47use mz_storage_types::oneshot_sources::{OneshotIngestionRequest, OneshotResultCallback};
48use mz_storage_types::parameters::StorageParameters;
49use mz_storage_types::read_holds::ReadHold;
50use mz_storage_types::read_policy::ReadPolicy;
51use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
52use mz_storage_types::sources::{
53 GenericSourceConnection, IngestionDescription, SourceDesc, SourceExportDataConfig,
54 SourceExportDetails, Timeline,
55};
56use serde::{Deserialize, Serialize};
57use timely::progress::Antichain;
58use timely::progress::frontier::MutableAntichain;
59use tokio::sync::{mpsc, oneshot};
60
61use crate::client::{AppendOnlyUpdate, StatusUpdate, TableData};
62use crate::statistics::WebhookStatistics;
63
64#[derive(
65 Clone,
66 Copy,
67 Debug,
68 Serialize,
69 Deserialize,
70 Eq,
71 PartialEq,
72 Hash,
73 PartialOrd,
74 Ord
75)]
76pub enum IntrospectionType {
77 /// We're not responsible for appending to this collection automatically, but we should
78 /// automatically bump the write frontier from time to time.
79 SinkStatusHistory,
80 SourceStatusHistory,
81 ShardMapping,
82
83 Frontiers,
84 ReplicaFrontiers,
85
86 ReplicaStatusHistory,
87 ReplicaMetricsHistory,
88 WallclockLagHistory,
89 WallclockLagHistogram,
90
91 // Note that this single-shard introspection source will be changed to per-replica,
92 // once we allow multiplexing multiple sources/sinks on a single cluster.
93 StorageSourceStatistics,
94 StorageSinkStatistics,
95
96 // The below are for statement logging.
97 StatementExecutionHistory,
98 SessionHistory,
99 PreparedStatementHistory,
100 SqlText,
101 // For statement lifecycle logging, which is closely related
102 // to statement logging
103 StatementLifecycleHistory,
104
105 // Collections written by the compute controller.
106 ComputeDependencies,
107 ComputeOperatorHydrationStatus,
108 ComputeMaterializedViewRefreshes,
109 ComputeErrorCounts,
110 ComputeHydrationTimes,
111
112 // Written by the Adapter for tracking AWS PrivateLink Connection Status History
113 PrivatelinkConnectionStatusHistory,
114}
115
116/// Describes how data is written to the collection.
117#[derive(Clone, Debug, Eq, PartialEq)]
118pub enum DataSource {
119 /// Ingest data from some external source.
120 Ingestion(IngestionDescription),
121 /// This source receives its data from the identified ingestion,
122 /// from an external object identified using `SourceExportDetails`.
123 ///
124 /// The referenced ingestion must be created before all of its exports.
125 IngestionExport {
126 ingestion_id: GlobalId,
127 details: SourceExportDetails,
128 data_config: SourceExportDataConfig,
129 },
130 /// Data comes from introspection sources, which the controller itself is
131 /// responsible for generating.
132 Introspection(IntrospectionType),
133 /// Data comes from the source's remapping/reclock operator.
134 Progress,
135 /// Data comes from external HTTP requests pushed to Materialize.
136 Webhook,
137 /// The adapter layer appends timestamped data, i.e. it is a `TABLE`.
138 Table,
139 /// This source's data does not need to be managed by the storage
140 /// controller, e.g. it's a materialized view or the catalog collection.
141 Other,
142 /// This collection is the output collection of a sink.
143 Sink { desc: ExportDescription },
144}
145
146/// Describes a request to create a source.
147#[derive(Clone, Debug, Eq, PartialEq)]
148pub struct CollectionDescription {
149 /// The schema of this collection
150 pub desc: RelationDesc,
151 /// The source of this collection's data.
152 pub data_source: DataSource,
153 /// An optional frontier to which the collection's `since` should be advanced.
154 pub since: Option<Antichain<Timestamp>>,
155 /// The timeline of the source. Absent for materialized views, continual tasks, etc.
156 pub timeline: Option<Timeline>,
157 /// The primary of this collections.
158 ///
159 /// Multiple storage collections can point to the same persist shard,
160 /// possibly with different schemas. In such a configuration, we select one
161 /// of the involved collections as the primary, who "owns" the persist
162 /// shard. All other involved collections have a dependency on the primary.
163 pub primary: Option<GlobalId>,
164}
165
166impl CollectionDescription {
167 /// Create a CollectionDescription for [`DataSource::Other`].
168 pub fn for_other(desc: RelationDesc, since: Option<Antichain<Timestamp>>) -> Self {
169 Self {
170 desc,
171 data_source: DataSource::Other,
172 since,
173 timeline: None,
174 primary: None,
175 }
176 }
177
178 /// Create a CollectionDescription for a table.
179 pub fn for_table(desc: RelationDesc) -> Self {
180 Self {
181 desc,
182 data_source: DataSource::Table,
183 since: None,
184 timeline: Some(Timeline::EpochMilliseconds),
185 primary: None,
186 }
187 }
188}
189
190#[derive(Clone, Debug, Eq, PartialEq)]
191pub struct ExportDescription<T = mz_repr::Timestamp> {
192 pub sink: StorageSinkDesc<(), T>,
193 /// The ID of the instance in which to install the export.
194 pub instance_id: StorageInstanceId,
195}
196
197#[derive(Debug)]
198pub enum Response {
199 FrontierUpdates(Vec<(GlobalId, Antichain<Timestamp>)>),
200}
201
202/// Metadata that the storage controller must know to properly handle the life
203/// cycle of creating and dropping collections.
204///
205/// This data should be kept consistent with the state modified using
206/// [`StorageTxn`].
207///
208/// n.b. the "txn WAL shard" is also metadata that's persisted, but if we
209/// included it in this struct it would never be read.
210#[derive(Debug, Clone, Serialize, Default)]
211pub struct StorageMetadata {
212 #[serde(serialize_with = "mz_ore::serde::map_key_to_string")]
213 pub collection_metadata: BTreeMap<GlobalId, ShardId>,
214 pub unfinalized_shards: BTreeSet<ShardId>,
215}
216
217impl StorageMetadata {
218 pub fn get_collection_shard(&self, id: GlobalId) -> Result<ShardId, StorageError> {
219 let shard_id = self
220 .collection_metadata
221 .get(&id)
222 .ok_or(StorageError::IdentifierMissing(id))?;
223
224 Ok(*shard_id)
225 }
226}
227
228/// Provides an interface for the storage controller to read and write data that
229/// is recorded elsewhere.
230///
231/// Data written to the implementor of this trait should make a consistent view
232/// of the data available through [`StorageMetadata`].
233#[async_trait]
234pub trait StorageTxn {
235 /// Retrieve all of the visible storage metadata.
236 ///
237 /// The value of this map should be treated as opaque.
238 fn get_collection_metadata(&self) -> BTreeMap<GlobalId, ShardId>;
239
240 /// Add new storage metadata for a collection.
241 ///
242 /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must include
243 /// this data.
244 fn insert_collection_metadata(
245 &mut self,
246 s: BTreeMap<GlobalId, ShardId>,
247 ) -> Result<(), StorageError>;
248
249 /// Remove the metadata associated with the identified collections.
250 ///
251 /// Subsequent calls to [`StorageTxn::get_collection_metadata`] must not
252 /// include these keys.
253 fn delete_collection_metadata(&mut self, ids: BTreeSet<GlobalId>) -> Vec<(GlobalId, ShardId)>;
254
255 /// Retrieve all of the shards that are no longer in use by an active
256 /// collection but are yet to be finalized.
257 fn get_unfinalized_shards(&self) -> BTreeSet<ShardId>;
258
259 /// Insert the specified values as unfinalized shards.
260 fn insert_unfinalized_shards(&mut self, s: BTreeSet<ShardId>) -> Result<(), StorageError>;
261
262 /// Mark the specified shards as finalized, deleting them from the
263 /// unfinalized shard collection.
264 fn mark_shards_as_finalized(&mut self, shards: BTreeSet<ShardId>);
265
266 /// Get the txn WAL shard for this environment if it exists.
267 fn get_txn_wal_shard(&self) -> Option<ShardId>;
268
269 /// Store the specified shard as the environment's txn WAL shard.
270 ///
271 /// The implementor should error if the shard is already specified.
272 fn write_txn_wal_shard(&mut self, shard: ShardId) -> Result<(), StorageError>;
273}
274
275pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
276
277/// A predicate for a `Row` filter.
278pub type RowPredicate = Box<dyn Fn(&Row) -> bool + Send + Sync>;
279
280/// High-level write operations applicable to storage collections.
281pub enum StorageWriteOp {
282 /// Append a set of rows with specified multiplicities.
283 ///
284 /// The multiplicities may be negative, so an `Append` operation can perform
285 /// both insertions and retractions.
286 Append { updates: Vec<(Row, Diff)> },
287 /// Delete all rows matching the given predicate.
288 Delete { filter: RowPredicate },
289}
290
291impl StorageWriteOp {
292 /// Returns whether this operation appends an empty set of updates.
293 pub fn is_empty_append(&self) -> bool {
294 match self {
295 Self::Append { updates } => updates.is_empty(),
296 Self::Delete { .. } => false,
297 }
298 }
299}
300
301#[async_trait(?Send)]
302pub trait StorageController: Debug {
303 /// Marks the end of any initialization commands.
304 ///
305 /// The implementor may wait for this method to be called before implementing prior commands,
306 /// and so it is important for a user to invoke this method as soon as it is comfortable.
307 /// This method can be invoked immediately, at the potential expense of performance.
308 fn initialization_complete(&mut self);
309
310 /// Update storage configuration with new parameters.
311 fn update_parameters(&mut self, config_params: StorageParameters);
312
313 /// Get the current configuration, including parameters updated with `update_parameters`.
314 fn config(&self) -> &StorageConfiguration;
315
316 /// Returns the [CollectionMetadata] of the collection identified by `id`.
317 fn collection_metadata(&self, id: GlobalId) -> Result<CollectionMetadata, CollectionMissing>;
318
319 /// Returns `true` iff the given collection/ingestion has been hydrated.
320 ///
321 /// For this check, zero-replica clusters are always considered hydrated.
322 /// Their collections would never normally be considered hydrated but it's
323 /// clearly intentional that they have no replicas.
324 fn collection_hydrated(&self, collection_id: GlobalId) -> Result<bool, StorageError>;
325
326 /// Returns `true` if each non-transient, non-excluded collection is
327 /// hydrated on at least one of the provided replicas.
328 ///
329 /// If no replicas are provided, this checks for hydration on _any_ replica.
330 ///
331 /// This also returns `true` in case this cluster does not have any
332 /// replicas.
333 fn collections_hydrated_on_replicas(
334 &self,
335 target_replica_ids: Option<Vec<ReplicaId>>,
336 target_cluster_ids: &StorageInstanceId,
337 exclude_collections: &BTreeSet<GlobalId>,
338 ) -> Result<bool, StorageError>;
339
340 /// Returns the since/upper frontiers of the identified collection.
341 fn collection_frontiers(
342 &self,
343 id: GlobalId,
344 ) -> Result<(Antichain<Timestamp>, Antichain<Timestamp>), CollectionMissing>;
345
346 /// Returns the since/upper frontiers of the identified collections.
347 ///
348 /// Having a method that returns both frontiers at the same time, for all
349 /// requested collections, ensures that we can get a consistent "snapshot"
350 /// of collection state. If we had separate methods instead, and/or would
351 /// allow getting frontiers for collections one at a time, it could happen
352 /// that collection state changes concurrently, while information is
353 /// gathered.
354 fn collections_frontiers(
355 &self,
356 id: Vec<GlobalId>,
357 ) -> Result<Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>, CollectionMissing>;
358
359 /// Acquire an iterator over [CollectionMetadata] for all active
360 /// collections.
361 ///
362 /// A collection is "active" when it has a non-empty frontier of read
363 /// capabilities.
364 fn active_collection_metadatas(&self) -> Vec<(GlobalId, CollectionMetadata)>;
365
366 /// Returns the IDs of ingestion exports running on the given instance. This
367 /// includes the ingestion itself, if any, and running source tables (aka.
368 /// subsources).
369 fn active_ingestion_exports(
370 &self,
371 instance_id: StorageInstanceId,
372 ) -> Box<dyn Iterator<Item = &GlobalId> + '_>;
373
374 /// Checks whether a collection exists under the given `GlobalId`. Returns
375 /// an error if the collection does not exist.
376 fn check_exists(&self, id: GlobalId) -> Result<(), StorageError>;
377
378 /// Creates a storage instance with the specified ID.
379 ///
380 /// A storage instance can have zero or one replicas. The instance is
381 /// created with zero replicas.
382 ///
383 /// Panics if a storage instance with the given ID already exists.
384 fn create_instance(&mut self, id: StorageInstanceId, workload_class: Option<String>);
385
386 /// Drops the storage instance with the given ID.
387 ///
388 /// If you call this method while the storage instance has a replica
389 /// attached, that replica will be leaked. Call `drop_replica` first.
390 ///
391 /// Panics if a storage instance with the given ID does not exist.
392 fn drop_instance(&mut self, id: StorageInstanceId);
393
394 /// Updates a storage instance's workload class.
395 fn update_instance_workload_class(
396 &mut self,
397 id: StorageInstanceId,
398 workload_class: Option<String>,
399 );
400
401 /// Connects the storage instance to the specified replica.
402 ///
403 /// If the storage instance is already attached to a replica, communication
404 /// with that replica is severed in favor of the new replica.
405 ///
406 /// In the future, this API will be adjusted to support active replication
407 /// of storage instances (i.e., multiple replicas attached to a given
408 /// storage instance).
409 fn connect_replica(
410 &mut self,
411 instance_id: StorageInstanceId,
412 replica_id: ReplicaId,
413 location: ClusterReplicaLocation,
414 );
415
416 /// Disconnects the storage instance from the specified replica.
417 fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId);
418
419 /// Across versions of Materialize the nullability of columns for some objects can change based
420 /// on updates to our optimizer.
421 ///
422 /// During bootstrap we will register these new schemas with Persist.
423 ///
424 /// See: <https://github.com/MaterializeInc/database-issues/issues/2488>
425 async fn evolve_nullability_for_bootstrap(
426 &mut self,
427 storage_metadata: &StorageMetadata,
428 collections: Vec<(GlobalId, RelationDesc)>,
429 ) -> Result<(), StorageError>;
430
431 /// Create the sources described in the individual RunIngestionCommand commands.
432 ///
433 /// Each command carries the source id, the source description, and any associated metadata
434 /// needed to ingest the particular source.
435 ///
436 /// This command installs collection state for the indicated sources, and they are
437 /// now valid to use in queries at times beyond the initial `since` frontiers. Each
438 /// collection also acquires a read capability at this frontier, which will need to
439 /// be repeatedly downgraded with `allow_compaction()` to permit compaction.
440 ///
441 /// This method is NOT idempotent; It can fail between processing of different
442 /// collections and leave the controller in an inconsistent state. It is almost
443 /// always wrong to do anything but abort the process on `Err`.
444 ///
445 /// The `register_ts` is used as the initial timestamp that tables are available for reads. (We
446 /// might later give non-tables the same treatment, but hold off on that initially.) Callers
447 /// must provide a Some if any of the collections is a table. A None may be given if none of the
448 /// collections are a table (i.e. all materialized views, sources, etc).
449 async fn create_collections(
450 &mut self,
451 storage_metadata: &StorageMetadata,
452 register_ts: Option<Timestamp>,
453 collections: Vec<(GlobalId, CollectionDescription)>,
454 ) -> Result<(), StorageError> {
455 self.create_collections_for_bootstrap(
456 storage_metadata,
457 register_ts,
458 collections,
459 &BTreeSet::new(),
460 )
461 .await
462 }
463
464 /// Like [`Self::create_collections`], except used specifically for bootstrap.
465 ///
466 /// `migrated_storage_collections` is a set of migrated storage collections to be excluded
467 /// from the txn-wal sub-system.
468 async fn create_collections_for_bootstrap(
469 &mut self,
470 storage_metadata: &StorageMetadata,
471 register_ts: Option<Timestamp>,
472 collections: Vec<(GlobalId, CollectionDescription)>,
473 migrated_storage_collections: &BTreeSet<GlobalId>,
474 ) -> Result<(), StorageError>;
475
476 /// Check that the ingestion associated with `id` can use the provided
477 /// [`SourceDesc`].
478 ///
479 /// Note that this check is optimistic and its return of `Ok(())` does not
480 /// guarantee that subsequent calls to `alter_ingestion_source_desc` are
481 /// guaranteed to succeed.
482 fn check_alter_ingestion_source_desc(
483 &mut self,
484 ingestion_id: GlobalId,
485 source_desc: &SourceDesc,
486 ) -> Result<(), StorageError>;
487
488 /// Alters each identified ingestion to use the correlated [`SourceDesc`].
489 async fn alter_ingestion_source_desc(
490 &mut self,
491 ingestion_ids: BTreeMap<GlobalId, SourceDesc>,
492 ) -> Result<(), StorageError>;
493
494 /// Alters each identified collection to use the correlated [`GenericSourceConnection`].
495 async fn alter_ingestion_connections(
496 &mut self,
497 source_connections: BTreeMap<GlobalId, GenericSourceConnection<InlinedConnection>>,
498 ) -> Result<(), StorageError>;
499
500 /// Alters the data config for the specified source exports of the specified ingestions.
501 async fn alter_ingestion_export_data_configs(
502 &mut self,
503 source_exports: BTreeMap<GlobalId, SourceExportDataConfig>,
504 ) -> Result<(), StorageError>;
505
506 async fn alter_table_desc(
507 &mut self,
508 existing_collection: GlobalId,
509 new_collection: GlobalId,
510 new_desc: RelationDesc,
511 expected_version: RelationVersion,
512 register_ts: Timestamp,
513 ) -> Result<(), StorageError>;
514
515 /// Acquire an immutable reference to the export state, should it exist.
516 fn export(&self, id: GlobalId) -> Result<&ExportState, StorageError>;
517
518 /// Acquire a mutable reference to the export state, should it exist.
519 fn export_mut(&mut self, id: GlobalId) -> Result<&mut ExportState, StorageError>;
520
521 /// Create a oneshot ingestion.
522 async fn create_oneshot_ingestion(
523 &mut self,
524 ingestion_id: uuid::Uuid,
525 collection_id: GlobalId,
526 instance_id: StorageInstanceId,
527 request: OneshotIngestionRequest,
528 result_tx: OneshotResultCallback<ProtoBatch>,
529 ) -> Result<(), StorageError>;
530
531 /// Cancel a oneshot ingestion.
532 fn cancel_oneshot_ingestion(&mut self, ingestion_id: uuid::Uuid) -> Result<(), StorageError>;
533
534 /// Alter the sink identified by the given id to match the provided `ExportDescription`.
535 async fn alter_export(
536 &mut self,
537 id: GlobalId,
538 export: ExportDescription,
539 ) -> Result<(), StorageError>;
540
541 /// For each identified export, alter its [`StorageSinkConnection`].
542 async fn alter_export_connections(
543 &mut self,
544 exports: BTreeMap<GlobalId, StorageSinkConnection>,
545 ) -> Result<(), StorageError>;
546
547 /// Drops the read capability for the tables and allows their resources to be reclaimed.
548 fn drop_tables(
549 &mut self,
550 storage_metadata: &StorageMetadata,
551 identifiers: Vec<GlobalId>,
552 ts: Timestamp,
553 ) -> Result<(), StorageError>;
554
555 /// Drops the read capability for the sources and allows their resources to be reclaimed.
556 fn drop_sources(
557 &mut self,
558 storage_metadata: &StorageMetadata,
559 identifiers: Vec<GlobalId>,
560 ) -> Result<(), StorageError>;
561
562 /// Drops the read capability for the sinks and allows their resources to be reclaimed.
563 fn drop_sinks(
564 &mut self,
565 storage_metadata: &StorageMetadata,
566 identifiers: Vec<GlobalId>,
567 ) -> Result<(), StorageError>;
568
569 /// Drops the read capability for the sinks and allows their resources to be reclaimed.
570 ///
571 /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
572 /// controller starts/restarts it has no durable state. That means that it has no way of
573 /// remembering any past commands sent. In the future we plan on persisting state for the
574 /// controller so that it is aware of past commands.
575 /// Therefore this method is for dropping sinks that we know to have been previously
576 /// created, but have been forgotten by the controller due to a restart.
577 /// Once command history becomes durable we can remove this method and use the normal
578 /// `drop_sinks`.
579 fn drop_sinks_unvalidated(
580 &mut self,
581 storage_metadata: &StorageMetadata,
582 identifiers: Vec<GlobalId>,
583 );
584
585 /// Drops the read capability for the sources and allows their resources to be reclaimed.
586 ///
587 /// TODO(jkosh44): This method does not validate the provided identifiers. Currently when the
588 /// controller starts/restarts it has no durable state. That means that it has no way of
589 /// remembering any past commands sent. In the future we plan on persisting state for the
590 /// controller so that it is aware of past commands.
591 /// Therefore this method is for dropping sources that we know to have been previously
592 /// created, but have been forgotten by the controller due to a restart.
593 /// Once command history becomes durable we can remove this method and use the normal
594 /// `drop_sources`.
595 fn drop_sources_unvalidated(
596 &mut self,
597 storage_metadata: &StorageMetadata,
598 identifiers: Vec<GlobalId>,
599 ) -> Result<(), StorageError>;
600
601 /// Append `updates` into the local input named `id` and advance its upper to `upper`.
602 ///
603 /// The method returns a oneshot that can be awaited to indicate completion of the write.
604 /// The method may return an error, indicating an immediately visible error, and also the
605 /// oneshot may return an error if one is encountered during the write.
606 ///
607 /// All updates in `commands` are applied atomically.
608 // TODO(petrosagg): switch upper to `Antichain<Timestamp>`
609 fn append_table(
610 &mut self,
611 write_ts: Timestamp,
612 advance_to: Timestamp,
613 commands: Vec<(GlobalId, Vec<TableData>)>,
614 ) -> Result<tokio::sync::oneshot::Receiver<Result<(), StorageError>>, StorageError>;
615
616 /// Returns a [`MonotonicAppender`] which is a channel that can be used to monotonically
617 /// append to the specified [`GlobalId`].
618 fn monotonic_appender(&self, id: GlobalId) -> Result<MonotonicAppender, StorageError>;
619
620 /// Returns a shared [`WebhookStatistics`] which can be used to report user-facing
621 /// statistics for this given webhhook, specified by the [`GlobalId`].
622 ///
623 // This is used to support a fairly special case, where a source needs to report statistics
624 // from outside the ordinary controller-clusterd path. Its possible to merge this with
625 // `monotonic_appender`, whose only current user is webhooks, but given that they will
626 // likely be moved to clusterd, we just leave this a special case.
627 fn webhook_statistics(&self, id: GlobalId) -> Result<Arc<WebhookStatistics>, StorageError>;
628
629 /// Waits until the controller is ready to process a response.
630 ///
631 /// This method may block for an arbitrarily long time.
632 ///
633 /// When the method returns, the owner should call
634 /// [`StorageController::process`] to process the ready message.
635 ///
636 /// This method is cancellation safe.
637 async fn ready(&mut self);
638
639 /// Processes the work queued by [`StorageController::ready`].
640 fn process(
641 &mut self,
642 storage_metadata: &StorageMetadata,
643 ) -> Result<Option<Response>, anyhow::Error>;
644
645 /// Exposes the internal state of the data shard for debugging and QA.
646 ///
647 /// We'll be thoughtful about making unnecessary changes, but the **output
648 /// of this method needs to be gated from users**, so that it's not subject
649 /// to our backward compatibility guarantees.
650 ///
651 /// TODO: Ideally this would return `impl Serialize` so the caller can do
652 /// with it what they like, but that doesn't work in traits yet. The
653 /// workaround (an associated type) doesn't work because persist doesn't
654 /// want to make the type public. In the meantime, move the `serde_json`
655 /// call from the single user into this method.
656 async fn inspect_persist_state(&self, id: GlobalId)
657 -> Result<serde_json::Value, anyhow::Error>;
658
659 /// Records append-only updates for the given introspection type.
660 ///
661 /// Rows passed in `updates` MUST have the correct schema for the given
662 /// introspection type, as readers rely on this and might panic otherwise.
663 fn append_introspection_updates(&mut self, type_: IntrospectionType, updates: Vec<(Row, Diff)>);
664
665 /// Records append-only status updates for the given introspection type.
666 fn append_status_introspection_updates(
667 &mut self,
668 type_: IntrospectionType,
669 updates: Vec<StatusUpdate>,
670 );
671
672 /// Updates the desired state of the given introspection type.
673 ///
674 /// Rows passed in `op` MUST have the correct schema for the given
675 /// introspection type, as readers rely on this and might panic otherwise.
676 fn update_introspection_collection(&mut self, type_: IntrospectionType, op: StorageWriteOp);
677
678 /// Returns a sender for updates to the specified append-only introspection collection.
679 ///
680 /// # Panics
681 ///
682 /// Panics if the given introspection type is not associated with an append-only collection.
683 fn append_only_introspection_tx(
684 &self,
685 type_: IntrospectionType,
686 ) -> mpsc::UnboundedSender<(
687 Vec<AppendOnlyUpdate>,
688 oneshot::Sender<Result<(), StorageError>>,
689 )>;
690
691 /// Returns a sender for updates to the specified differential introspection collection.
692 ///
693 /// # Panics
694 ///
695 /// Panics if the given introspection type is not associated with a differential collection.
696 fn differential_introspection_tx(
697 &self,
698 type_: IntrospectionType,
699 ) -> mpsc::UnboundedSender<(StorageWriteOp, oneshot::Sender<Result<(), StorageError>>)>;
700
701 async fn real_time_recent_timestamp(
702 &self,
703 source_ids: BTreeSet<GlobalId>,
704 timeout: Duration,
705 ) -> Result<BoxFuture<Result<Timestamp, StorageError>>, StorageError>;
706
707 /// Returns the state of the [`StorageController`] formatted as JSON.
708 fn dump(&self) -> Result<serde_json::Value, anyhow::Error>;
709}
710
711impl DataSource {
712 /// Returns true if the storage controller manages the data shard for this
713 /// source using txn-wal.
714 pub fn in_txns(&self) -> bool {
715 match self {
716 DataSource::Table => true,
717 DataSource::Other
718 | DataSource::Ingestion(_)
719 | DataSource::IngestionExport { .. }
720 | DataSource::Introspection(_)
721 | DataSource::Progress
722 | DataSource::Webhook => false,
723 DataSource::Sink { .. } => false,
724 }
725 }
726}
727
728/// A wrapper struct that presents the adapter token to a format that is understandable by persist
729/// and also allows us to differentiate between a token being present versus being set for the
730/// first time.
731#[derive(PartialEq, Clone, Debug, Default)]
732pub struct PersistEpoch(pub Option<NonZeroI64>);
733
734impl Codec64 for PersistEpoch {
735 fn codec_name() -> String {
736 "PersistEpoch".to_owned()
737 }
738
739 fn encode(&self) -> [u8; 8] {
740 self.0.map(NonZeroI64::get).unwrap_or(0).to_le_bytes()
741 }
742
743 fn decode(buf: [u8; 8]) -> Self {
744 Self(NonZeroI64::new(i64::from_le_bytes(buf)))
745 }
746}
747
748impl From<NonZeroI64> for PersistEpoch {
749 fn from(epoch: NonZeroI64) -> Self {
750 Self(Some(epoch))
751 }
752}
753
754/// State maintained about individual exports.
755#[derive(Debug)]
756pub struct ExportState {
757 /// Really only for keeping track of changes to the `derived_since`.
758 pub read_capabilities: MutableAntichain<Timestamp>,
759
760 /// The cluster this export is associated with.
761 pub cluster_id: StorageInstanceId,
762
763 /// The current since frontier, derived from `write_frontier` using
764 /// `hold_policy`.
765 pub derived_since: Antichain<Timestamp>,
766
767 /// The read holds that this export has on its dependencies (its input and itself). When
768 /// the upper of the export changes, we downgrade this, which in turn
769 /// downgrades holds we have on our dependencies' sinces.
770 pub read_holds: [ReadHold; 2],
771
772 /// The policy to use to downgrade `self.read_capability`.
773 pub read_policy: ReadPolicy,
774
775 /// Reported write frontier.
776 pub write_frontier: Antichain<Timestamp>,
777}
778
779impl ExportState {
780 pub fn new(
781 cluster_id: StorageInstanceId,
782 read_hold: ReadHold,
783 self_hold: ReadHold,
784 write_frontier: Antichain<Timestamp>,
785 read_policy: ReadPolicy,
786 ) -> Self {
787 let mut dependency_since = Antichain::from_elem(Timestamp::MIN);
788 for read_hold in [&read_hold, &self_hold] {
789 dependency_since.join_assign(read_hold.since());
790 }
791 Self {
792 read_capabilities: MutableAntichain::from(dependency_since.borrow()),
793 cluster_id,
794 derived_since: dependency_since,
795 read_holds: [read_hold, self_hold],
796 read_policy,
797 write_frontier,
798 }
799 }
800
801 /// Returns the cluster to which the export is bound.
802 pub fn cluster_id(&self) -> StorageInstanceId {
803 self.cluster_id
804 }
805
806 /// Returns the cluster to which the export is bound.
807 pub fn input_hold(&self) -> &ReadHold {
808 &self.read_holds[0]
809 }
810
811 /// Returns whether the export was dropped.
812 pub fn is_dropped(&self) -> bool {
813 self.read_holds.iter().all(|h| h.since().is_empty())
814 }
815}
816/// A channel that allows you to append a set of updates to a pre-defined [`GlobalId`].
817///
818/// See `CollectionManager::monotonic_appender` to acquire a [`MonotonicAppender`].
819#[derive(Clone, Debug)]
820pub struct MonotonicAppender {
821 /// Channel that sends to a [`tokio::task`] which pushes updates to Persist.
822 tx: mpsc::UnboundedSender<(
823 Vec<AppendOnlyUpdate>,
824 oneshot::Sender<Result<(), StorageError>>,
825 )>,
826}
827
828impl MonotonicAppender {
829 pub fn new(
830 tx: mpsc::UnboundedSender<(
831 Vec<AppendOnlyUpdate>,
832 oneshot::Sender<Result<(), StorageError>>,
833 )>,
834 ) -> Self {
835 MonotonicAppender { tx }
836 }
837
838 pub async fn append(&self, updates: Vec<AppendOnlyUpdate>) -> Result<(), StorageError> {
839 let (tx, rx) = oneshot::channel();
840
841 // Send our update to the CollectionManager.
842 self.tx
843 .send((updates, tx))
844 .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
845
846 // Wait for a response, if we fail to receive then the CollectionManager has gone away.
847 let result = rx
848 .await
849 .map_err(|_| StorageError::ShuttingDown("collection manager"))?;
850
851 result
852 }
853}
854
855/// A wallclock lag measurement.
856///
857/// The enum representation reflects the fact that wallclock lag is undefined for unreadable
858/// collections, i.e. collections that contain no readable times.
859#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
860pub enum WallclockLag {
861 /// Lag value in seconds, for readable collections.
862 Seconds(u64),
863 /// Undefined lag, for unreadable collections.
864 Undefined,
865}
866
867impl WallclockLag {
868 /// The smallest possible wallclock lag measurement.
869 pub const MIN: Self = Self::Seconds(0);
870
871 /// Return the maximum of two lag values.
872 ///
873 /// We treat `Undefined` lags as greater than `Seconds`, to ensure we never report low lag
874 /// values when a collection was actually unreadable for some amount of time.
875 pub fn max(self, other: Self) -> Self {
876 match (self, other) {
877 (Self::Seconds(a), Self::Seconds(b)) => Self::Seconds(a.max(b)),
878 (Self::Undefined, _) | (_, Self::Undefined) => Self::Undefined,
879 }
880 }
881
882 /// Return the wrapped seconds value, or a default if the lag is `Undefined`.
883 pub fn unwrap_seconds_or(self, default: u64) -> u64 {
884 match self {
885 Self::Seconds(s) => s,
886 Self::Undefined => default,
887 }
888 }
889
890 /// Create a new `WallclockLag` by transforming the wrapped seconds value.
891 pub fn map_seconds(self, f: impl FnOnce(u64) -> u64) -> Self {
892 match self {
893 Self::Seconds(s) => Self::Seconds(f(s)),
894 Self::Undefined => Self::Undefined,
895 }
896 }
897
898 /// Convert this lag value into a [`Datum::Interval`] or [`Datum::Null`].
899 pub fn into_interval_datum(self) -> Datum<'static> {
900 match self {
901 Self::Seconds(secs) => {
902 let micros = i64::try_from(secs * 1_000_000).expect("must fit");
903 Datum::Interval(Interval::new(0, 0, micros))
904 }
905 Self::Undefined => Datum::Null,
906 }
907 }
908
909 /// Convert this lag value into a [`Datum::UInt64`] or [`Datum::Null`].
910 pub fn into_uint64_datum(self) -> Datum<'static> {
911 match self {
912 Self::Seconds(secs) => Datum::UInt64(secs),
913 Self::Undefined => Datum::Null,
914 }
915 }
916}
917
918/// The period covered by a wallclock lag histogram, represented as a `[start, end)` range.
919#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
920pub struct WallclockLagHistogramPeriod {
921 pub start: CheckedTimestamp<DateTime<Utc>>,
922 pub end: CheckedTimestamp<DateTime<Utc>>,
923}
924
925impl WallclockLagHistogramPeriod {
926 /// Construct a `WallclockLagHistogramPeriod` from the given epoch timestamp and dyncfg.
927 pub fn from_epoch_millis(epoch_ms: u64, dyncfg: &ConfigSet) -> Self {
928 let interval = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.get(dyncfg);
929 let interval_ms = u64::try_from(interval.as_millis()).unwrap_or_else(|_| {
930 soft_panic_or_log!("excessive wallclock lag histogram period interval: {interval:?}");
931 let default = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.default();
932 u64::try_from(default.as_millis()).unwrap()
933 });
934 let interval_ms = std::cmp::max(interval_ms, 1);
935
936 let start_ms = epoch_ms - (epoch_ms % interval_ms);
937 let start_dt = mz_ore::now::to_datetime(start_ms);
938 let start = start_dt.try_into().expect("must fit");
939
940 let end_ms = start_ms + interval_ms;
941 let end_dt = mz_ore::now::to_datetime(end_ms);
942 let end = end_dt.try_into().expect("must fit");
943
944 Self { start, end }
945 }
946}
947
948#[cfg(test)]
949mod tests {
950 use super::*;
951
952 #[mz_ore::test]
953 fn lag_writes_by_zero() {
954 let policy =
955 ReadPolicy::lag_writes_by(mz_repr::Timestamp::default(), mz_repr::Timestamp::default());
956 let write_frontier = Antichain::from_elem(mz_repr::Timestamp::from(5));
957 assert_eq!(policy.frontier(write_frontier.borrow()), write_frontier);
958 }
959}