1use std::collections::BTreeMap;
16use std::convert::Infallible;
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22
23use differential_dataflow::Collection;
24use differential_dataflow::containers::TimelyStack;
25use mz_repr::{Diff, GlobalId, Row};
26use mz_storage_types::errors::{DataflowError, DecodeError};
27use mz_storage_types::sources::SourceTimestamp;
28use mz_timely_util::builder_async::PressOnDropButton;
29use pin_project::pin_project;
30use serde::{Deserialize, Serialize};
31use timely::dataflow::{Scope, ScopeParent, Stream};
32use timely::progress::Antichain;
33use tokio::sync::Semaphore;
34use tokio_util::sync::PollSemaphore;
35
36use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
37use crate::source::RawSourceCreationConfig;
38
39#[derive(Clone, Debug)]
46pub enum ProgressStatisticsUpdate {
47 SteadyState {
48 offset_known: u64,
49 offset_committed: u64,
50 },
51 Snapshot {
52 records_known: u64,
53 records_staged: u64,
54 },
55}
56
57pub type StackedCollection<G, T> =
58 Collection<G, T, Diff, TimelyStack<(T, <G as ScopeParent>::Timestamp, Diff)>>;
59
60pub trait SourceRender {
62 type Time: SourceTimestamp;
63 const STATUS_NAMESPACE: StatusNamespace;
64
65 fn render<G: Scope<Timestamp = Self::Time>>(
89 self,
90 scope: &mut G,
91 config: &RawSourceCreationConfig,
92 resume_uppers: impl futures::Stream<Item = Antichain<Self::Time>> + 'static,
93 start_signal: impl std::future::Future<Output = ()> + 'static,
94 ) -> (
95 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
96 Stream<G, Infallible>,
97 Stream<G, HealthStatusMessage>,
98 Stream<G, ProgressStatisticsUpdate>,
99 Option<Stream<G, Probe<Self::Time>>>,
100 Vec<PressOnDropButton>,
101 );
102}
103
104#[derive(Debug, Clone)]
107pub struct SourceMessage {
108 pub key: Row,
110 pub value: Row,
112 pub metadata: Row,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct Probe<T> {
119 pub probe_ts: mz_repr::Timestamp,
121 pub upstream_frontier: Antichain<T>,
123}
124
125mod columnation {
126 use columnation::{Columnation, Region};
127 use mz_repr::Row;
128
129 use super::SourceMessage;
130
131 impl Columnation for SourceMessage {
132 type InnerRegion = SourceMessageRegion;
133 }
134
135 #[derive(Default)]
136 pub struct SourceMessageRegion {
137 inner: <Row as Columnation>::InnerRegion,
138 }
139
140 impl Region for SourceMessageRegion {
141 type Item = SourceMessage;
142
143 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
144 SourceMessage {
145 key: unsafe { self.inner.copy(&item.key) },
146 value: unsafe { self.inner.copy(&item.value) },
147 metadata: unsafe { self.inner.copy(&item.metadata) },
148 }
149 }
150
151 fn clear(&mut self) {
152 self.inner.clear()
153 }
154
155 fn reserve_items<'a, I>(&mut self, items: I)
156 where
157 Self: 'a,
158 I: Iterator<Item = &'a Self::Item> + Clone,
159 {
160 self.inner.reserve_items(
161 items
162 .map(|item| [&item.key, &item.value, &item.metadata])
163 .flatten(),
164 )
165 }
166
167 fn reserve_regions<'a, I>(&mut self, regions: I)
168 where
169 Self: 'a,
170 I: Iterator<Item = &'a Self> + Clone,
171 {
172 self.inner.reserve_regions(regions.map(|r| &r.inner))
173 }
174
175 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
176 self.inner.heap_size(callback)
177 }
178 }
179}
180
181#[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, Serialize, Deserialize)]
183pub struct SourceOutput<FromTime> {
184 pub key: Row,
186 pub value: Row,
188 pub metadata: Row,
190 pub from_time: FromTime,
192}
193
194#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
196pub struct DecodeResult<FromTime> {
197 pub key: Option<Result<Row, DecodeError>>,
199 pub value: Option<Result<Row, DecodeError>>,
203 pub metadata: Row,
205 pub from_time: FromTime,
207}
208
209#[pin_project]
210pub struct SignaledFuture<F> {
211 #[pin]
212 fut: F,
213 semaphore: PollSemaphore,
214}
215
216impl<F: Future> SignaledFuture<F> {
217 pub fn new(semaphore: Arc<Semaphore>, fut: F) -> Self {
218 Self {
219 fut,
220 semaphore: PollSemaphore::new(semaphore),
221 }
222 }
223}
224
225impl<F: Future> Future for SignaledFuture<F> {
226 type Output = F::Output;
227
228 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
229 let this = self.project();
230 let permit = ready!(this.semaphore.poll_acquire(cx));
231 let ret = this.fut.poll(cx);
232 drop(permit);
233 ret
234 }
235}