1use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::future::Future;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll, ready};
21
22use differential_dataflow::Collection;
23use differential_dataflow::containers::TimelyStack;
24use mz_repr::{Diff, GlobalId, Row};
25use mz_storage_types::errors::{DataflowError, DecodeError};
26use mz_storage_types::sources::SourceTimestamp;
27use mz_timely_util::builder_async::PressOnDropButton;
28use pin_project::pin_project;
29use serde::{Deserialize, Serialize};
30use timely::dataflow::{Scope, ScopeParent, StreamVec};
31use timely::progress::Antichain;
32use tokio::sync::Semaphore;
33use tokio_util::sync::PollSemaphore;
34
35use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
36use crate::source::RawSourceCreationConfig;
37
38#[derive(Clone, Debug)]
45pub enum ProgressStatisticsUpdate {
46 SteadyState {
47 offset_known: u64,
48 offset_committed: u64,
49 },
50 Snapshot {
51 records_known: u64,
52 records_staged: u64,
53 },
54}
55
56pub type StackedCollection<G, T> =
57 Collection<G, TimelyStack<(T, <G as ScopeParent>::Timestamp, Diff)>>;
58
59pub trait SourceRender {
61 type Time: SourceTimestamp;
62 const STATUS_NAMESPACE: StatusNamespace;
63
64 fn render<G: Scope<Timestamp = Self::Time>>(
88 self,
89 scope: &mut G,
90 config: &RawSourceCreationConfig,
91 resume_uppers: impl futures::Stream<Item = Antichain<Self::Time>> + 'static,
92 start_signal: impl std::future::Future<Output = ()> + 'static,
93 ) -> (
94 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
95 StreamVec<G, HealthStatusMessage>,
96 StreamVec<G, Probe<Self::Time>>,
97 Vec<PressOnDropButton>,
98 );
99}
100
101#[derive(Debug, Clone)]
104pub struct SourceMessage {
105 pub key: Row,
107 pub value: Row,
109 pub metadata: Row,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct Probe<T> {
116 pub probe_ts: mz_repr::Timestamp,
118 pub upstream_frontier: Antichain<T>,
120}
121
122mod columnation {
123 use columnation::{Columnation, Region};
124 use mz_repr::Row;
125
126 use super::SourceMessage;
127
128 impl Columnation for SourceMessage {
129 type InnerRegion = SourceMessageRegion;
130 }
131
132 #[derive(Default)]
133 pub struct SourceMessageRegion {
134 inner: <Row as Columnation>::InnerRegion,
135 }
136
137 impl Region for SourceMessageRegion {
138 type Item = SourceMessage;
139
140 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
141 SourceMessage {
142 key: unsafe { self.inner.copy(&item.key) },
143 value: unsafe { self.inner.copy(&item.value) },
144 metadata: unsafe { self.inner.copy(&item.metadata) },
145 }
146 }
147
148 fn clear(&mut self) {
149 self.inner.clear()
150 }
151
152 fn reserve_items<'a, I>(&mut self, items: I)
153 where
154 Self: 'a,
155 I: Iterator<Item = &'a Self::Item> + Clone,
156 {
157 self.inner.reserve_items(
158 items
159 .map(|item| [&item.key, &item.value, &item.metadata])
160 .flatten(),
161 )
162 }
163
164 fn reserve_regions<'a, I>(&mut self, regions: I)
165 where
166 Self: 'a,
167 I: Iterator<Item = &'a Self> + Clone,
168 {
169 self.inner.reserve_regions(regions.map(|r| &r.inner))
170 }
171
172 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
173 self.inner.heap_size(callback)
174 }
175 }
176}
177
178#[derive(
180 Debug,
181 PartialEq,
182 Eq,
183 Hash,
184 PartialOrd,
185 Ord,
186 Clone,
187 Serialize,
188 Deserialize
189)]
190pub struct SourceOutput<FromTime> {
191 pub key: Row,
193 pub value: Row,
195 pub metadata: Row,
197 pub from_time: FromTime,
199}
200
201#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
203pub struct DecodeResult<FromTime> {
204 pub key: Option<Result<Row, DecodeError>>,
206 pub value: Option<Result<Row, DecodeError>>,
210 pub metadata: Row,
212 pub from_time: FromTime,
214}
215
216#[pin_project]
217pub struct SignaledFuture<F> {
218 #[pin]
219 fut: F,
220 semaphore: PollSemaphore,
221}
222
223impl<F: Future> SignaledFuture<F> {
224 pub fn new(semaphore: Arc<Semaphore>, fut: F) -> Self {
225 Self {
226 fut,
227 semaphore: PollSemaphore::new(semaphore),
228 }
229 }
230}
231
232impl<F: Future> Future for SignaledFuture<F> {
233 type Output = F::Output;
234
235 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
236 let this = self.project();
237 let permit = ready!(this.semaphore.poll_acquire(cx));
238 let ret = this.fut.poll(cx);
239 drop(permit);
240 ret
241 }
242}