mz_storage/source/
types.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types related to the source ingestion pipeline/framework.
11
12// https://github.com/tokio-rs/prost/issues/237
13// #![allow(missing_docs)]
14
15use std::collections::BTreeMap;
16use std::convert::Infallible;
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22
23use differential_dataflow::Collection;
24use differential_dataflow::containers::TimelyStack;
25use mz_repr::{Diff, GlobalId, Row};
26use mz_storage_types::errors::{DataflowError, DecodeError};
27use mz_storage_types::sources::SourceTimestamp;
28use mz_timely_util::builder_async::PressOnDropButton;
29use pin_project::pin_project;
30use serde::{Deserialize, Serialize};
31use timely::dataflow::{Scope, ScopeParent, Stream};
32use timely::progress::Antichain;
33use tokio::sync::Semaphore;
34use tokio_util::sync::PollSemaphore;
35
36use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
37use crate::source::RawSourceCreationConfig;
38
39/// An update produced by implementors of `SourceRender` that presents an _aggregated_
40/// description of the number of _offset_committed_ and _offset_known_ for the given
41/// source.
42///
43/// The aggregate is required to be a 64 bit unsigned integer, whose units are
44/// implementation-defined.
45#[derive(Clone, Debug)]
46pub enum ProgressStatisticsUpdate {
47    SteadyState {
48        offset_known: u64,
49        offset_committed: u64,
50    },
51    Snapshot {
52        records_known: u64,
53        records_staged: u64,
54    },
55}
56
57pub type StackedCollection<G, T> =
58    Collection<G, T, Diff, TimelyStack<(T, <G as ScopeParent>::Timestamp, Diff)>>;
59
60/// Describes a source that can render itself in a timely scope.
61pub trait SourceRender {
62    type Time: SourceTimestamp;
63    const STATUS_NAMESPACE: StatusNamespace;
64
65    /// Renders the source in the provided timely scope.
66    ///
67    /// The `resume_uppers` stream can be used by the source to observe the overall progress of the
68    /// ingestion. When a frontier appears in this stream the source implementation can be certain
69    /// that future ingestion instances will request to read the external data only at times beyond
70    /// that frontier. Therefore, the source implementation can react to this stream by e.g
71    /// committing offsets upstream or advancing the LSN of a replication slot. It is safe to
72    /// ignore this argument.
73    ///
74    /// Rendering a source is expected to return four things.
75    ///
76    /// First, a source must produce a collection that is produced by the rendered dataflow and
77    /// must contain *definite*[^1] data for all times beyond the resumption frontier.
78    ///
79    /// Second, a source must produce a progress stream that will be used to drive reclocking.
80    /// The frontier of this stream decides for which upstream offsets bindings can be minted.
81    ///
82    /// Third, a source must produce a stream of health status updates.
83    ///
84    /// Finally, the source is expected to return an opaque token that when dropped will cause the
85    /// source to immediately drop all capabilities and advance its frontier to the empty antichain.
86    ///
87    /// [^1] <https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20210831_correctness.md#describing-definite-data>
88    fn render<G: Scope<Timestamp = Self::Time>>(
89        self,
90        scope: &mut G,
91        config: &RawSourceCreationConfig,
92        resume_uppers: impl futures::Stream<Item = Antichain<Self::Time>> + 'static,
93        start_signal: impl std::future::Future<Output = ()> + 'static,
94    ) -> (
95        BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
96        Stream<G, Infallible>,
97        Stream<G, HealthStatusMessage>,
98        Stream<G, ProgressStatisticsUpdate>,
99        Option<Stream<G, Probe<Self::Time>>>,
100        Vec<PressOnDropButton>,
101    );
102}
103
104/// Source-agnostic wrapper for messages. Each source must implement a
105/// conversion to Message.
106#[derive(Debug, Clone)]
107pub struct SourceMessage {
108    /// The message key
109    pub key: Row,
110    /// The message value
111    pub value: Row,
112    /// Additional metadata columns requested by the user
113    pub metadata: Row,
114}
115
116/// The result of probing an upstream system for its write frontier.
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct Probe<T> {
119    /// The timestamp at which this probe was initiated.
120    pub probe_ts: mz_repr::Timestamp,
121    /// The frontier obtain from the upstream system.
122    pub upstream_frontier: Antichain<T>,
123}
124
125mod columnation {
126    use columnation::{Columnation, Region};
127    use mz_repr::Row;
128
129    use super::SourceMessage;
130
131    impl Columnation for SourceMessage {
132        type InnerRegion = SourceMessageRegion;
133    }
134
135    #[derive(Default)]
136    pub struct SourceMessageRegion {
137        inner: <Row as Columnation>::InnerRegion,
138    }
139
140    impl Region for SourceMessageRegion {
141        type Item = SourceMessage;
142
143        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
144            SourceMessage {
145                key: unsafe { self.inner.copy(&item.key) },
146                value: unsafe { self.inner.copy(&item.value) },
147                metadata: unsafe { self.inner.copy(&item.metadata) },
148            }
149        }
150
151        fn clear(&mut self) {
152            self.inner.clear()
153        }
154
155        fn reserve_items<'a, I>(&mut self, items: I)
156        where
157            Self: 'a,
158            I: Iterator<Item = &'a Self::Item> + Clone,
159        {
160            self.inner.reserve_items(
161                items
162                    .map(|item| [&item.key, &item.value, &item.metadata])
163                    .flatten(),
164            )
165        }
166
167        fn reserve_regions<'a, I>(&mut self, regions: I)
168        where
169            Self: 'a,
170            I: Iterator<Item = &'a Self> + Clone,
171        {
172            self.inner.reserve_regions(regions.map(|r| &r.inner))
173        }
174
175        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
176            self.inner.heap_size(callback)
177        }
178    }
179}
180
181/// A record produced by a source
182#[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, Serialize, Deserialize)]
183pub struct SourceOutput<FromTime> {
184    /// The record's key (or some empty/default value for sources without the concept of key)
185    pub key: Row,
186    /// The record's value
187    pub value: Row,
188    /// Additional metadata columns requested by the user
189    pub metadata: Row,
190    /// The original timestamp of this message
191    pub from_time: FromTime,
192}
193
194/// The output of the decoding operator
195#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
196pub struct DecodeResult<FromTime> {
197    /// The decoded key
198    pub key: Option<Result<Row, DecodeError>>,
199    /// The decoded value, as well as the the
200    /// differential `diff` value for this value, if the value
201    /// is present and not and error.
202    pub value: Option<Result<Row, DecodeError>>,
203    /// Additional metadata requested by the user
204    pub metadata: Row,
205    /// The original timestamp of this message
206    pub from_time: FromTime,
207}
208
209#[pin_project]
210pub struct SignaledFuture<F> {
211    #[pin]
212    fut: F,
213    semaphore: PollSemaphore,
214}
215
216impl<F: Future> SignaledFuture<F> {
217    pub fn new(semaphore: Arc<Semaphore>, fut: F) -> Self {
218        Self {
219            fut,
220            semaphore: PollSemaphore::new(semaphore),
221        }
222    }
223}
224
225impl<F: Future> Future for SignaledFuture<F> {
226    type Output = F::Output;
227
228    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
229        let this = self.project();
230        let permit = ready!(this.semaphore.poll_acquire(cx));
231        let ret = this.fut.poll(cx);
232        drop(permit);
233        ret
234    }
235}