1use std::collections::BTreeMap;
16use std::convert::Infallible;
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::task::{Context, Poll, ready};
22
23use differential_dataflow::Collection;
24use differential_dataflow::containers::TimelyStack;
25use mz_repr::{Diff, GlobalId, Row};
26use mz_storage_types::errors::{DataflowError, DecodeError};
27use mz_storage_types::sources::SourceTimestamp;
28use mz_timely_util::builder_async::PressOnDropButton;
29use pin_project::pin_project;
30use serde::{Deserialize, Serialize};
31use timely::dataflow::{Scope, ScopeParent, Stream};
32use timely::progress::Antichain;
33use tokio::sync::Semaphore;
34use tokio_util::sync::PollSemaphore;
35
36use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
37use crate::source::RawSourceCreationConfig;
38
39#[derive(Clone, Debug)]
46pub enum ProgressStatisticsUpdate {
47 SteadyState {
48 offset_known: u64,
49 offset_committed: u64,
50 },
51 Snapshot {
52 records_known: u64,
53 records_staged: u64,
54 },
55}
56
57pub type StackedCollection<G, T> =
58 Collection<G, TimelyStack<(T, <G as ScopeParent>::Timestamp, Diff)>>;
59
60pub trait SourceRender {
62 type Time: SourceTimestamp;
63 const STATUS_NAMESPACE: StatusNamespace;
64
65 fn render<G: Scope<Timestamp = Self::Time>>(
89 self,
90 scope: &mut G,
91 config: &RawSourceCreationConfig,
92 resume_uppers: impl futures::Stream<Item = Antichain<Self::Time>> + 'static,
93 start_signal: impl std::future::Future<Output = ()> + 'static,
94 ) -> (
95 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
96 Stream<G, Infallible>,
97 Stream<G, HealthStatusMessage>,
98 Option<Stream<G, Probe<Self::Time>>>,
99 Vec<PressOnDropButton>,
100 );
101}
102
103#[derive(Debug, Clone)]
106pub struct SourceMessage {
107 pub key: Row,
109 pub value: Row,
111 pub metadata: Row,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct Probe<T> {
118 pub probe_ts: mz_repr::Timestamp,
120 pub upstream_frontier: Antichain<T>,
122}
123
124mod columnation {
125 use columnation::{Columnation, Region};
126 use mz_repr::Row;
127
128 use super::SourceMessage;
129
130 impl Columnation for SourceMessage {
131 type InnerRegion = SourceMessageRegion;
132 }
133
134 #[derive(Default)]
135 pub struct SourceMessageRegion {
136 inner: <Row as Columnation>::InnerRegion,
137 }
138
139 impl Region for SourceMessageRegion {
140 type Item = SourceMessage;
141
142 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
143 SourceMessage {
144 key: unsafe { self.inner.copy(&item.key) },
145 value: unsafe { self.inner.copy(&item.value) },
146 metadata: unsafe { self.inner.copy(&item.metadata) },
147 }
148 }
149
150 fn clear(&mut self) {
151 self.inner.clear()
152 }
153
154 fn reserve_items<'a, I>(&mut self, items: I)
155 where
156 Self: 'a,
157 I: Iterator<Item = &'a Self::Item> + Clone,
158 {
159 self.inner.reserve_items(
160 items
161 .map(|item| [&item.key, &item.value, &item.metadata])
162 .flatten(),
163 )
164 }
165
166 fn reserve_regions<'a, I>(&mut self, regions: I)
167 where
168 Self: 'a,
169 I: Iterator<Item = &'a Self> + Clone,
170 {
171 self.inner.reserve_regions(regions.map(|r| &r.inner))
172 }
173
174 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
175 self.inner.heap_size(callback)
176 }
177 }
178}
179
180#[derive(
182 Debug,
183 PartialEq,
184 Eq,
185 Hash,
186 PartialOrd,
187 Ord,
188 Clone,
189 Serialize,
190 Deserialize
191)]
192pub struct SourceOutput<FromTime> {
193 pub key: Row,
195 pub value: Row,
197 pub metadata: Row,
199 pub from_time: FromTime,
201}
202
203#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
205pub struct DecodeResult<FromTime> {
206 pub key: Option<Result<Row, DecodeError>>,
208 pub value: Option<Result<Row, DecodeError>>,
212 pub metadata: Row,
214 pub from_time: FromTime,
216}
217
218#[pin_project]
219pub struct SignaledFuture<F> {
220 #[pin]
221 fut: F,
222 semaphore: PollSemaphore,
223}
224
225impl<F: Future> SignaledFuture<F> {
226 pub fn new(semaphore: Arc<Semaphore>, fut: F) -> Self {
227 Self {
228 fut,
229 semaphore: PollSemaphore::new(semaphore),
230 }
231 }
232}
233
234impl<F: Future> Future for SignaledFuture<F> {
235 type Output = F::Output;
236
237 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
238 let this = self.project();
239 let permit = ready!(this.semaphore.poll_acquire(cx));
240 let ret = this.fut.poll(cx);
241 drop(permit);
242 ret
243 }
244}