1use std::collections::BTreeMap;
16use std::convert::Infallible;
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22
23use differential_dataflow::Collection;
24use differential_dataflow::containers::TimelyStack;
25use mz_repr::{Diff, GlobalId, Row};
26use mz_storage_types::errors::{DataflowError, DecodeError};
27use mz_storage_types::sources::SourceTimestamp;
28use mz_timely_util::builder_async::PressOnDropButton;
29use pin_project::pin_project;
30use serde::{Deserialize, Serialize};
31use timely::dataflow::{Scope, ScopeParent, Stream};
32use timely::progress::Antichain;
33use tokio::sync::Semaphore;
34use tokio_util::sync::PollSemaphore;
35
36use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
37use crate::source::RawSourceCreationConfig;
38
39#[derive(Clone, Debug)]
46pub enum ProgressStatisticsUpdate {
47 SteadyState {
48 offset_known: u64,
49 offset_committed: u64,
50 },
51 Snapshot {
52 records_known: u64,
53 records_staged: u64,
54 },
55}
56
57pub type StackedCollection<G, T> =
58 Collection<G, T, Diff, TimelyStack<(T, <G as ScopeParent>::Timestamp, Diff)>>;
59
60pub trait SourceRender {
62 type Time: SourceTimestamp;
63 const STATUS_NAMESPACE: StatusNamespace;
64
65 fn render<G: Scope<Timestamp = Self::Time>>(
89 self,
90 scope: &mut G,
91 config: &RawSourceCreationConfig,
92 resume_uppers: impl futures::Stream<Item = Antichain<Self::Time>> + 'static,
93 start_signal: impl std::future::Future<Output = ()> + 'static,
94 ) -> (
95 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
96 Stream<G, Infallible>,
97 Stream<G, HealthStatusMessage>,
98 Option<Stream<G, Probe<Self::Time>>>,
99 Vec<PressOnDropButton>,
100 );
101}
102
103#[derive(Debug, Clone)]
106pub struct SourceMessage {
107 pub key: Row,
109 pub value: Row,
111 pub metadata: Row,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct Probe<T> {
118 pub probe_ts: mz_repr::Timestamp,
120 pub upstream_frontier: Antichain<T>,
122}
123
124mod columnation {
125 use columnation::{Columnation, Region};
126 use mz_repr::Row;
127
128 use super::SourceMessage;
129
130 impl Columnation for SourceMessage {
131 type InnerRegion = SourceMessageRegion;
132 }
133
134 #[derive(Default)]
135 pub struct SourceMessageRegion {
136 inner: <Row as Columnation>::InnerRegion,
137 }
138
139 impl Region for SourceMessageRegion {
140 type Item = SourceMessage;
141
142 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
143 SourceMessage {
144 key: unsafe { self.inner.copy(&item.key) },
145 value: unsafe { self.inner.copy(&item.value) },
146 metadata: unsafe { self.inner.copy(&item.metadata) },
147 }
148 }
149
150 fn clear(&mut self) {
151 self.inner.clear()
152 }
153
154 fn reserve_items<'a, I>(&mut self, items: I)
155 where
156 Self: 'a,
157 I: Iterator<Item = &'a Self::Item> + Clone,
158 {
159 self.inner.reserve_items(
160 items
161 .map(|item| [&item.key, &item.value, &item.metadata])
162 .flatten(),
163 )
164 }
165
166 fn reserve_regions<'a, I>(&mut self, regions: I)
167 where
168 Self: 'a,
169 I: Iterator<Item = &'a Self> + Clone,
170 {
171 self.inner.reserve_regions(regions.map(|r| &r.inner))
172 }
173
174 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
175 self.inner.heap_size(callback)
176 }
177 }
178}
179
180#[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Clone, Serialize, Deserialize)]
182pub struct SourceOutput<FromTime> {
183 pub key: Row,
185 pub value: Row,
187 pub metadata: Row,
189 pub from_time: FromTime,
191}
192
193#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
195pub struct DecodeResult<FromTime> {
196 pub key: Option<Result<Row, DecodeError>>,
198 pub value: Option<Result<Row, DecodeError>>,
202 pub metadata: Row,
204 pub from_time: FromTime,
206}
207
208#[pin_project]
209pub struct SignaledFuture<F> {
210 #[pin]
211 fut: F,
212 semaphore: PollSemaphore,
213}
214
215impl<F: Future> SignaledFuture<F> {
216 pub fn new(semaphore: Arc<Semaphore>, fut: F) -> Self {
217 Self {
218 fut,
219 semaphore: PollSemaphore::new(semaphore),
220 }
221 }
222}
223
224impl<F: Future> Future for SignaledFuture<F> {
225 type Output = F::Output;
226
227 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
228 let this = self.project();
229 let permit = ready!(this.semaphore.poll_acquire(cx));
230 let ret = this.fut.poll(cx);
231 drop(permit);
232 ret
233 }
234}