1use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::future::Future;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll, ready};
21
22use differential_dataflow::Collection;
23use mz_repr::{Diff, GlobalId, Row};
24use mz_storage_types::errors::{DataflowError, DecodeError};
25use mz_storage_types::sources::SourceTimestamp;
26use mz_timely_util::builder_async::PressOnDropButton;
27use mz_timely_util::columnation::ColumnationStack;
28use pin_project::pin_project;
29use serde::{Deserialize, Serialize};
30use timely::dataflow::{Scope, StreamVec};
31use timely::progress::Antichain;
32use tokio::sync::Semaphore;
33use tokio_util::sync::PollSemaphore;
34
35use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
36use crate::source::RawSourceCreationConfig;
37
38#[derive(Clone, Debug)]
45pub enum ProgressStatisticsUpdate {
46 SteadyState {
47 offset_known: u64,
48 offset_committed: u64,
49 },
50 Snapshot {
51 records_known: u64,
52 records_staged: u64,
53 },
54}
55
56pub type StackedCollection<'scope, T, D> = Collection<'scope, T, ColumnationStack<(D, T, Diff)>>;
57
58pub trait SourceRender {
60 type Time: SourceTimestamp;
61 const STATUS_NAMESPACE: StatusNamespace;
62
63 fn render<'scope>(
87 self,
88 scope: Scope<'scope, Self::Time>,
89 config: &RawSourceCreationConfig,
90 resume_uppers: impl futures::Stream<Item = Antichain<Self::Time>> + 'static,
91 start_signal: impl std::future::Future<Output = ()> + 'static,
92 ) -> (
93 BTreeMap<
94 GlobalId,
95 StackedCollection<'scope, Self::Time, Result<SourceMessage, DataflowError>>,
96 >,
97 StreamVec<'scope, Self::Time, HealthStatusMessage>,
98 StreamVec<'scope, Self::Time, Probe<Self::Time>>,
99 Vec<PressOnDropButton>,
100 );
101}
102
103#[derive(Debug, Clone)]
106pub struct SourceMessage {
107 pub key: Row,
109 pub value: Row,
111 pub metadata: Row,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct Probe<T> {
118 pub probe_ts: mz_repr::Timestamp,
120 pub upstream_frontier: Antichain<T>,
122}
123
124mod columnation {
125 use columnation::{Columnation, Region};
126 use mz_repr::Row;
127
128 use super::SourceMessage;
129
130 impl Columnation for SourceMessage {
131 type InnerRegion = SourceMessageRegion;
132 }
133
134 #[derive(Default)]
135 pub struct SourceMessageRegion {
136 inner: <Row as Columnation>::InnerRegion,
137 }
138
139 impl Region for SourceMessageRegion {
140 type Item = SourceMessage;
141
142 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
143 SourceMessage {
144 key: unsafe { self.inner.copy(&item.key) },
145 value: unsafe { self.inner.copy(&item.value) },
146 metadata: unsafe { self.inner.copy(&item.metadata) },
147 }
148 }
149
150 fn clear(&mut self) {
151 self.inner.clear()
152 }
153
154 fn reserve_items<'a, I>(&mut self, items: I)
155 where
156 Self: 'a,
157 I: Iterator<Item = &'a Self::Item> + Clone,
158 {
159 self.inner.reserve_items(
160 items
161 .map(|item| [&item.key, &item.value, &item.metadata])
162 .flatten(),
163 )
164 }
165
166 fn reserve_regions<'a, I>(&mut self, regions: I)
167 where
168 Self: 'a,
169 I: Iterator<Item = &'a Self> + Clone,
170 {
171 self.inner.reserve_regions(regions.map(|r| &r.inner))
172 }
173
174 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
175 self.inner.heap_size(callback)
176 }
177 }
178}
179
180#[derive(
182 Debug,
183 PartialEq,
184 Eq,
185 Hash,
186 PartialOrd,
187 Ord,
188 Clone,
189 Serialize,
190 Deserialize
191)]
192pub struct SourceOutput<FromTime> {
193 pub key: Row,
195 pub value: Row,
197 pub metadata: Row,
199 pub from_time: FromTime,
201}
202
203#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
205pub struct DecodeResult<FromTime> {
206 pub key: Option<Result<Row, DecodeError>>,
208 pub value: Option<Result<Row, DecodeError>>,
212 pub metadata: Row,
214 pub from_time: FromTime,
216}
217
218#[pin_project]
219pub struct SignaledFuture<F> {
220 #[pin]
221 fut: F,
222 semaphore: PollSemaphore,
223}
224
225impl<F: Future> SignaledFuture<F> {
226 pub fn new(semaphore: Arc<Semaphore>, fut: F) -> Self {
227 Self {
228 fut,
229 semaphore: PollSemaphore::new(semaphore),
230 }
231 }
232}
233
234impl<F: Future> Future for SignaledFuture<F> {
235 type Output = F::Output;
236
237 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
238 let this = self.project();
239 let permit = ready!(this.semaphore.poll_acquire(cx));
240 let ret = this.fut.poll(cx);
241 drop(permit);
242 ret
243 }
244}