1use std::collections::BTreeMap;
16use std::fmt::Debug;
17use std::future::Future;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll, ready};
21
22use differential_dataflow::Collection;
23use mz_repr::{Diff, GlobalId, Row};
24use mz_storage_types::errors::{DataflowError, DecodeError};
25use mz_storage_types::sources::SourceTimestamp;
26use mz_timely_util::builder_async::PressOnDropButton;
27use pin_project::pin_project;
28use serde::{Deserialize, Serialize};
29use timely::dataflow::{Scope, StreamVec};
30use timely::progress::Antichain;
31use tokio::sync::Semaphore;
32use tokio_util::sync::PollSemaphore;
33
34use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
35use crate::source::RawSourceCreationConfig;
36
37#[derive(Clone, Debug)]
44pub enum ProgressStatisticsUpdate {
45 SteadyState {
46 offset_known: u64,
47 offset_committed: u64,
48 },
49 Snapshot {
50 records_known: u64,
51 records_staged: u64,
52 },
53}
54
55pub type StackedCollection<'scope, T, D> = Collection<'scope, T, Vec<(D, T, Diff)>>;
56
57pub trait SourceRender {
59 type Time: SourceTimestamp;
60 const STATUS_NAMESPACE: StatusNamespace;
61
62 fn render<'scope>(
86 self,
87 scope: Scope<'scope, Self::Time>,
88 config: &RawSourceCreationConfig,
89 resume_uppers: impl futures::Stream<Item = Antichain<Self::Time>> + 'static,
90 start_signal: impl std::future::Future<Output = ()> + 'static,
91 ) -> (
92 BTreeMap<
93 GlobalId,
94 StackedCollection<'scope, Self::Time, Result<SourceMessage, DataflowError>>,
95 >,
96 StreamVec<'scope, Self::Time, HealthStatusMessage>,
97 StreamVec<'scope, Self::Time, Probe<Self::Time>>,
98 Vec<PressOnDropButton>,
99 );
100}
101
102#[derive(Debug, Clone)]
105pub struct SourceMessage {
106 pub key: Row,
108 pub value: Row,
110 pub metadata: Row,
112}
113
114impl SourceMessage {
115 pub fn byte_len(&self) -> usize {
117 self.key.byte_len() + self.value.byte_len() + self.metadata.byte_len()
118 }
119}
120
121pub trait FuelSize {
126 fn fuel_size(&self) -> usize;
127}
128
129impl FuelSize for SourceMessage {
130 fn fuel_size(&self) -> usize {
131 self.byte_len()
132 }
133}
134
135impl FuelSize for Row {
136 fn fuel_size(&self) -> usize {
137 self.byte_len()
138 }
139}
140
141impl FuelSize for Vec<u8> {
142 fn fuel_size(&self) -> usize {
143 self.len()
144 }
145}
146
147impl FuelSize for bytes::Bytes {
148 fn fuel_size(&self) -> usize {
149 self.len()
150 }
151}
152
153impl<T: FuelSize, E: FuelSize> FuelSize for Result<T, E> {
154 fn fuel_size(&self) -> usize {
155 match self {
156 Ok(t) => t.fuel_size(),
157 Err(e) => e.fuel_size(),
158 }
159 }
160}
161
162impl<A: FuelSize, B: FuelSize> FuelSize for (A, B) {
167 fn fuel_size(&self) -> usize {
168 self.0.fuel_size() + self.1.fuel_size()
169 }
170}
171
172impl<A: FuelSize, B: FuelSize, C: FuelSize> FuelSize for (A, B, C) {
173 fn fuel_size(&self) -> usize {
174 self.0.fuel_size() + self.1.fuel_size() + self.2.fuel_size()
175 }
176}
177
178macro_rules! impl_fuel_size_stack {
180 ($($t:ty),* $(,)?) => {
181 $(
182 impl FuelSize for $t {
183 fn fuel_size(&self) -> usize {
184 std::mem::size_of_val(self)
185 }
186 }
187 )*
188 };
189}
190
191impl_fuel_size_stack!(
192 usize,
193 u32,
194 u64,
195 Diff,
196 mz_repr::Timestamp,
197 mz_storage_types::sources::MzOffset,
198 mz_sql_server_util::cdc::Lsn,
199 DataflowError,
200);
201
202impl<P, T> FuelSize for mz_timely_util::order::Partitioned<P, T> {
203 fn fuel_size(&self) -> usize {
204 std::mem::size_of_val(self)
205 }
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct Probe<T> {
211 pub probe_ts: mz_repr::Timestamp,
213 pub upstream_frontier: Antichain<T>,
215}
216
217#[derive(
219 Debug,
220 PartialEq,
221 Eq,
222 Hash,
223 PartialOrd,
224 Ord,
225 Clone,
226 Serialize,
227 Deserialize
228)]
229pub struct SourceOutput<FromTime> {
230 pub key: Row,
232 pub value: Row,
234 pub metadata: Row,
236 pub from_time: FromTime,
238}
239
240#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)]
242pub struct DecodeResult<FromTime> {
243 pub key: Option<Result<Row, DecodeError>>,
245 pub value: Option<Result<Row, DecodeError>>,
249 pub metadata: Row,
251 pub from_time: FromTime,
253}
254
255#[pin_project]
256pub struct SignaledFuture<F> {
257 #[pin]
258 fut: F,
259 semaphore: PollSemaphore,
260}
261
262impl<F: Future> SignaledFuture<F> {
263 pub fn new(semaphore: Arc<Semaphore>, fut: F) -> Self {
264 Self {
265 fut,
266 semaphore: PollSemaphore::new(semaphore),
267 }
268 }
269}
270
271impl<F: Future> Future for SignaledFuture<F> {
272 type Output = F::Output;
273
274 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
275 let this = self.project();
276 let permit = ready!(this.semaphore.poll_acquire(cx));
277 let ret = this.fut.poll(cx);
278 drop(permit);
279 ret
280 }
281}