Skip to main content

mz_storage/source/
types.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types related to the source ingestion pipeline/framework.
11
12// https://github.com/tokio-rs/prost/issues/237
13// #![allow(missing_docs)]
14
15use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::future::Future;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll, ready};
21
22use differential_dataflow::Collection;
23use mz_repr::{Diff, GlobalId, Row};
24use mz_storage_types::errors::{DataflowError, DecodeError};
25use mz_storage_types::sources::SourceTimestamp;
26use mz_timely_util::builder_async::PressOnDropButton;
27use pin_project::pin_project;
28use serde::{Deserialize, Serialize};
29use timely::dataflow::{Scope, StreamVec};
30use timely::progress::Antichain;
31use tokio::sync::Semaphore;
32use tokio_util::sync::PollSemaphore;
33
34use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
35use crate::source::RawSourceCreationConfig;
36
37/// An update produced by implementors of `SourceRender` that presents an _aggregated_
38/// description of the number of _offset_committed_ and _offset_known_ for the given
39/// source.
40///
41/// The aggregate is required to be a 64 bit unsigned integer, whose units are
42/// implementation-defined.
43#[derive(Clone, Debug)]
44pub enum ProgressStatisticsUpdate {
45    SteadyState {
46        offset_known: u64,
47        offset_committed: u64,
48    },
49    Snapshot {
50        records_known: u64,
51        records_staged: u64,
52    },
53}
54
55pub type StackedCollection<'scope, T, D> = Collection<'scope, T, Vec<(D, T, Diff)>>;
56
57/// Describes a source that can render itself in a timely scope.
58pub trait SourceRender {
59    type Time: SourceTimestamp;
60    const STATUS_NAMESPACE: StatusNamespace;
61
62    /// Renders the source in the provided timely scope.
63    ///
64    /// The `resume_uppers` stream can be used by the source to observe the overall progress of the
65    /// ingestion. When a frontier appears in this stream the source implementation can be certain
66    /// that future ingestion instances will request to read the external data only at times beyond
67    /// that frontier. Therefore, the source implementation can react to this stream by e.g
68    /// committing offsets upstream or advancing the LSN of a replication slot. It is safe to
69    /// ignore this argument.
70    ///
71    /// Rendering a source is expected to return four things.
72    ///
73    /// First, a source must produce a collection that is produced by the rendered dataflow and
74    /// must contain *definite*[^1] data for all times beyond the resumption frontier.
75    ///
76    /// Second, a source must produce a stream of health status updates.
77    ///
78    /// Third, a source must produce a probe stream that periodically reports the upstream
79    /// frontier. This is used to drive reclocking and mint new bindings.
80    ///
81    /// Finally, the source is expected to return an opaque token that when dropped will cause the
82    /// source to immediately drop all capabilities and advance its frontier to the empty antichain.
83    ///
84    /// [^1]: <https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20210831_correctness.md#describing-definite-data>
85    fn render<'scope>(
86        self,
87        scope: Scope<'scope, Self::Time>,
88        config: &RawSourceCreationConfig,
89        resume_uppers: impl futures::Stream<Item = Antichain<Self::Time>> + 'static,
90        start_signal: impl std::future::Future<Output = ()> + 'static,
91    ) -> (
92        BTreeMap<
93            GlobalId,
94            StackedCollection<'scope, Self::Time, Result<SourceMessage, DataflowError>>,
95        >,
96        StreamVec<'scope, Self::Time, HealthStatusMessage>,
97        StreamVec<'scope, Self::Time, Probe<Self::Time>>,
98        Vec<PressOnDropButton>,
99    );
100}
101
102/// Source-agnostic wrapper for messages. Each source must implement a
103/// conversion to Message.
104#[derive(Debug, Clone)]
105pub struct SourceMessage {
106    /// The message key
107    pub key: Row,
108    /// The message value
109    pub value: Row,
110    /// Additional metadata columns requested by the user
111    pub metadata: Row,
112}
113
114impl SourceMessage {
115    /// Heap-size estimate used to drive fuel accounting in source operators.
116    pub fn byte_len(&self) -> usize {
117        self.key.byte_len() + self.value.byte_len() + self.metadata.byte_len()
118    }
119}
120
121/// Heap-size estimate used by source operators to drive `give_fueled` yielding.
122///
123/// Stack-only values may report `size_of_val(self)`; values with heap-allocated
124/// payloads (rows, byte buffers) should include that payload.
125pub trait FuelSize {
126    fn fuel_size(&self) -> usize;
127}
128
129impl FuelSize for SourceMessage {
130    fn fuel_size(&self) -> usize {
131        self.byte_len()
132    }
133}
134
135impl FuelSize for Row {
136    fn fuel_size(&self) -> usize {
137        self.byte_len()
138    }
139}
140
141impl FuelSize for Vec<u8> {
142    fn fuel_size(&self) -> usize {
143        self.len()
144    }
145}
146
147impl FuelSize for bytes::Bytes {
148    fn fuel_size(&self) -> usize {
149        self.len()
150    }
151}
152
153impl<T: FuelSize, E: FuelSize> FuelSize for Result<T, E> {
154    fn fuel_size(&self) -> usize {
155        match self {
156            Ok(t) => t.fuel_size(),
157            Err(e) => e.fuel_size(),
158        }
159    }
160}
161
162/// Tuples sum the fuel size of their elements. Trivial coordinate fields
163/// (output indices, timestamps, diffs) implement `FuelSize` to return their
164/// stack size, so an `update.fuel_size()` call charges only the heap-allocated
165/// payload like rows or byte buffers.
166impl<A: FuelSize, B: FuelSize> FuelSize for (A, B) {
167    fn fuel_size(&self) -> usize {
168        self.0.fuel_size() + self.1.fuel_size()
169    }
170}
171
172impl<A: FuelSize, B: FuelSize, C: FuelSize> FuelSize for (A, B, C) {
173    fn fuel_size(&self) -> usize {
174        self.0.fuel_size() + self.1.fuel_size() + self.2.fuel_size()
175    }
176}
177
178/// Convenience macro for declaring `FuelSize` on a stack-only type.
179macro_rules! impl_fuel_size_stack {
180    ($($t:ty),* $(,)?) => {
181        $(
182            impl FuelSize for $t {
183                fn fuel_size(&self) -> usize {
184                    std::mem::size_of_val(self)
185                }
186            }
187        )*
188    };
189}
190
191impl_fuel_size_stack!(
192    usize,
193    u32,
194    u64,
195    Diff,
196    mz_repr::Timestamp,
197    mz_storage_types::sources::MzOffset,
198    mz_sql_server_util::cdc::Lsn,
199    DataflowError,
200);
201
202impl<P, T> FuelSize for mz_timely_util::order::Partitioned<P, T> {
203    fn fuel_size(&self) -> usize {
204        std::mem::size_of_val(self)
205    }
206}
207
208/// The result of probing an upstream system for its write frontier.
209#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct Probe<T> {
211    /// The timestamp at which this probe was initiated.
212    pub probe_ts: mz_repr::Timestamp,
213    /// The frontier obtain from the upstream system.
214    pub upstream_frontier: Antichain<T>,
215}
216
217/// A record produced by a source
218#[derive(
219    Debug,
220    PartialEq,
221    Eq,
222    Hash,
223    PartialOrd,
224    Ord,
225    Clone,
226    Serialize,
227    Deserialize
228)]
229pub struct SourceOutput<FromTime> {
230    /// The record's key (or some empty/default value for sources without the concept of key)
231    pub key: Row,
232    /// The record's value
233    pub value: Row,
234    /// Additional metadata columns requested by the user
235    pub metadata: Row,
236    /// The original timestamp of this message
237    pub from_time: FromTime,
238}
239
240/// The output of the decoding operator
241#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
242pub struct DecodeResult<FromTime> {
243    /// The decoded key
244    pub key: Option<Result<Row, DecodeError>>,
245    /// The decoded value, as well as the the
246    /// differential `diff` value for this value, if the value
247    /// is present and not and error.
248    pub value: Option<Result<Row, DecodeError>>,
249    /// Additional metadata requested by the user
250    pub metadata: Row,
251    /// The original timestamp of this message
252    pub from_time: FromTime,
253}
254
255#[pin_project]
256pub struct SignaledFuture<F> {
257    #[pin]
258    fut: F,
259    semaphore: PollSemaphore,
260}
261
262impl<F: Future> SignaledFuture<F> {
263    pub fn new(semaphore: Arc<Semaphore>, fut: F) -> Self {
264        Self {
265            fut,
266            semaphore: PollSemaphore::new(semaphore),
267        }
268    }
269}
270
271impl<F: Future> Future for SignaledFuture<F> {
272    type Output = F::Output;
273
274    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
275        let this = self.project();
276        let permit = ready!(this.semaphore.poll_acquire(cx));
277        let ret = this.fut.poll(cx);
278        drop(permit);
279        ret
280    }
281}