Skip to main content

mz_storage/source/
types.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types related to the source ingestion pipeline/framework.
11
12// https://github.com/tokio-rs/prost/issues/237
13// #![allow(missing_docs)]
14
15use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::future::Future;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll, ready};
21
22use differential_dataflow::Collection;
23use differential_dataflow::containers::TimelyStack;
24use mz_repr::{Diff, GlobalId, Row};
25use mz_storage_types::errors::{DataflowError, DecodeError};
26use mz_storage_types::sources::SourceTimestamp;
27use mz_timely_util::builder_async::PressOnDropButton;
28use pin_project::pin_project;
29use serde::{Deserialize, Serialize};
30use timely::dataflow::{Scope, ScopeParent, StreamVec};
31use timely::progress::Antichain;
32use tokio::sync::Semaphore;
33use tokio_util::sync::PollSemaphore;
34
35use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
36use crate::source::RawSourceCreationConfig;
37
38/// An update produced by implementors of `SourceRender` that presents an _aggregated_
39/// description of the number of _offset_committed_ and _offset_known_ for the given
40/// source.
41///
42/// The aggregate is required to be a 64 bit unsigned integer, whose units are
43/// implementation-defined.
44#[derive(Clone, Debug)]
45pub enum ProgressStatisticsUpdate {
46    SteadyState {
47        offset_known: u64,
48        offset_committed: u64,
49    },
50    Snapshot {
51        records_known: u64,
52        records_staged: u64,
53    },
54}
55
56pub type StackedCollection<G, T> =
57    Collection<G, TimelyStack<(T, <G as ScopeParent>::Timestamp, Diff)>>;
58
59/// Describes a source that can render itself in a timely scope.
60pub trait SourceRender {
61    type Time: SourceTimestamp;
62    const STATUS_NAMESPACE: StatusNamespace;
63
64    /// Renders the source in the provided timely scope.
65    ///
66    /// The `resume_uppers` stream can be used by the source to observe the overall progress of the
67    /// ingestion. When a frontier appears in this stream the source implementation can be certain
68    /// that future ingestion instances will request to read the external data only at times beyond
69    /// that frontier. Therefore, the source implementation can react to this stream by e.g
70    /// committing offsets upstream or advancing the LSN of a replication slot. It is safe to
71    /// ignore this argument.
72    ///
73    /// Rendering a source is expected to return four things.
74    ///
75    /// First, a source must produce a collection that is produced by the rendered dataflow and
76    /// must contain *definite*[^1] data for all times beyond the resumption frontier.
77    ///
78    /// Second, a source must produce a stream of health status updates.
79    ///
80    /// Third, a source must produce a probe stream that periodically reports the upstream
81    /// frontier. This is used to drive reclocking and mint new bindings.
82    ///
83    /// Finally, the source is expected to return an opaque token that when dropped will cause the
84    /// source to immediately drop all capabilities and advance its frontier to the empty antichain.
85    ///
86    /// [^1]: <https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20210831_correctness.md#describing-definite-data>
87    fn render<G: Scope<Timestamp = Self::Time>>(
88        self,
89        scope: &mut G,
90        config: &RawSourceCreationConfig,
91        resume_uppers: impl futures::Stream<Item = Antichain<Self::Time>> + 'static,
92        start_signal: impl std::future::Future<Output = ()> + 'static,
93    ) -> (
94        BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
95        StreamVec<G, HealthStatusMessage>,
96        StreamVec<G, Probe<Self::Time>>,
97        Vec<PressOnDropButton>,
98    );
99}
100
101/// Source-agnostic wrapper for messages. Each source must implement a
102/// conversion to Message.
103#[derive(Debug, Clone)]
104pub struct SourceMessage {
105    /// The message key
106    pub key: Row,
107    /// The message value
108    pub value: Row,
109    /// Additional metadata columns requested by the user
110    pub metadata: Row,
111}
112
113/// The result of probing an upstream system for its write frontier.
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct Probe<T> {
116    /// The timestamp at which this probe was initiated.
117    pub probe_ts: mz_repr::Timestamp,
118    /// The frontier obtain from the upstream system.
119    pub upstream_frontier: Antichain<T>,
120}
121
122mod columnation {
123    use columnation::{Columnation, Region};
124    use mz_repr::Row;
125
126    use super::SourceMessage;
127
128    impl Columnation for SourceMessage {
129        type InnerRegion = SourceMessageRegion;
130    }
131
132    #[derive(Default)]
133    pub struct SourceMessageRegion {
134        inner: <Row as Columnation>::InnerRegion,
135    }
136
137    impl Region for SourceMessageRegion {
138        type Item = SourceMessage;
139
140        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
141            SourceMessage {
142                key: unsafe { self.inner.copy(&item.key) },
143                value: unsafe { self.inner.copy(&item.value) },
144                metadata: unsafe { self.inner.copy(&item.metadata) },
145            }
146        }
147
148        fn clear(&mut self) {
149            self.inner.clear()
150        }
151
152        fn reserve_items<'a, I>(&mut self, items: I)
153        where
154            Self: 'a,
155            I: Iterator<Item = &'a Self::Item> + Clone,
156        {
157            self.inner.reserve_items(
158                items
159                    .map(|item| [&item.key, &item.value, &item.metadata])
160                    .flatten(),
161            )
162        }
163
164        fn reserve_regions<'a, I>(&mut self, regions: I)
165        where
166            Self: 'a,
167            I: Iterator<Item = &'a Self> + Clone,
168        {
169            self.inner.reserve_regions(regions.map(|r| &r.inner))
170        }
171
172        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
173            self.inner.heap_size(callback)
174        }
175    }
176}
177
178/// A record produced by a source
179#[derive(
180    Debug,
181    PartialEq,
182    Eq,
183    Hash,
184    PartialOrd,
185    Ord,
186    Clone,
187    Serialize,
188    Deserialize
189)]
190pub struct SourceOutput<FromTime> {
191    /// The record's key (or some empty/default value for sources without the concept of key)
192    pub key: Row,
193    /// The record's value
194    pub value: Row,
195    /// Additional metadata columns requested by the user
196    pub metadata: Row,
197    /// The original timestamp of this message
198    pub from_time: FromTime,
199}
200
201/// The output of the decoding operator
202#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
203pub struct DecodeResult<FromTime> {
204    /// The decoded key
205    pub key: Option<Result<Row, DecodeError>>,
206    /// The decoded value, as well as the the
207    /// differential `diff` value for this value, if the value
208    /// is present and not and error.
209    pub value: Option<Result<Row, DecodeError>>,
210    /// Additional metadata requested by the user
211    pub metadata: Row,
212    /// The original timestamp of this message
213    pub from_time: FromTime,
214}
215
216#[pin_project]
217pub struct SignaledFuture<F> {
218    #[pin]
219    fut: F,
220    semaphore: PollSemaphore,
221}
222
223impl<F: Future> SignaledFuture<F> {
224    pub fn new(semaphore: Arc<Semaphore>, fut: F) -> Self {
225        Self {
226            fut,
227            semaphore: PollSemaphore::new(semaphore),
228        }
229    }
230}
231
232impl<F: Future> Future for SignaledFuture<F> {
233    type Output = F::Output;
234
235    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
236        let this = self.project();
237        let permit = ready!(this.semaphore.poll_acquire(cx));
238        let ret = this.fut.poll(cx);
239        drop(permit);
240        ret
241    }
242}