1use std::any::Any;
11use std::cell::RefCell;
12use std::ops::DerefMut;
13use std::rc::Rc;
14
15use differential_dataflow::consolidation::consolidate_updates;
16use differential_dataflow::{AsCollection, VecCollection};
17use mz_compute_client::protocol::response::{SubscribeBatch, SubscribeResponse};
18use mz_compute_types::sinks::{ComputeSinkDesc, SubscribeSinkConnection};
19use mz_expr::{ColumnOrder, compare_columns};
20use mz_ore::iter;
21use mz_repr::{Diff, GlobalId, Row, Timestamp, UpdateCollection};
22use mz_storage_types::controller::CollectionMetadata;
23use mz_storage_types::errors::DataflowError;
24use mz_timely_util::probe::{Handle, ProbeNotify};
25use timely::PartialOrder;
26use timely::dataflow::Scope;
27use timely::dataflow::channels::pact::Pipeline;
28use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
29use timely::progress::Antichain;
30use timely::progress::timestamp::Timestamp as TimelyTimestamp;
31
32use crate::render::StartSignal;
33use crate::render::sinks::SinkRender;
34
35impl<G> SinkRender<G> for SubscribeSinkConnection
36where
37 G: Scope<Timestamp = Timestamp>,
38{
39 fn render_sink(
40 &self,
41 compute_state: &mut crate::compute_state::ComputeState,
42 sink: &ComputeSinkDesc<CollectionMetadata>,
43 sink_id: GlobalId,
44 as_of: Antichain<Timestamp>,
45 _start_signal: StartSignal,
46 sinked_collection: VecCollection<G, Row, Diff>,
47 err_collection: VecCollection<G, DataflowError, Diff>,
48 _ct_times: Option<VecCollection<G, (), Diff>>,
49 output_probe: &Handle<Timestamp>,
50 ) -> Option<Rc<dyn Any>> {
51 let subscribe_protocol_handle = Rc::new(RefCell::new(Some(SubscribeProtocol {
55 sink_id,
56 sink_as_of: as_of.clone(),
57 subscribe_response_buffer: Some(Rc::clone(&compute_state.subscribe_response_buffer)),
58 prev_upper: Antichain::from_elem(Timestamp::minimum()),
59 output: self.output.clone(),
60 poison: None,
61 })));
62 let subscribe_protocol_weak = Rc::downgrade(&subscribe_protocol_handle);
63 let sinked_collection = sinked_collection
64 .inner
65 .probe_notify_with(vec![output_probe.clone()])
66 .as_collection();
67 subscribe(
68 sinked_collection,
69 err_collection,
70 sink_id,
71 sink.with_snapshot,
72 as_of,
73 sink.up_to.clone(),
74 subscribe_protocol_handle,
75 );
76
77 Some(Rc::new(scopeguard::guard((), move |_| {
81 if let Some(subscribe_protocol_handle) = subscribe_protocol_weak.upgrade() {
82 std::mem::drop(subscribe_protocol_handle.borrow_mut().take())
83 }
84 })))
85 }
86}
87
88fn subscribe<G>(
89 sinked_collection: VecCollection<G, Row, Diff>,
90 err_collection: VecCollection<G, DataflowError, Diff>,
91 sink_id: GlobalId,
92 with_snapshot: bool,
93 as_of: Antichain<G::Timestamp>,
94 up_to: Antichain<G::Timestamp>,
95 subscribe_protocol_handle: Rc<RefCell<Option<SubscribeProtocol>>>,
96) where
97 G: Scope<Timestamp = Timestamp>,
98{
99 let name = format!("subscribe-{}", sink_id);
100 let mut op = OperatorBuilder::new(name, sinked_collection.scope());
101 let mut ok_input = op.new_input(sinked_collection.inner, Pipeline);
102 let mut err_input = op.new_input(err_collection.inner, Pipeline);
103
104 op.build(|_cap| {
105 let mut rows_to_emit = Vec::new();
106 let mut errors_to_emit = Vec::new();
107 let mut finished = false;
108
109 move |frontiers| {
110 if finished {
111 ok_input.for_each(|_, _| {});
113 err_input.for_each(|_, _| {});
114 return;
115 }
116
117 let mut frontier = Antichain::new();
118 for input_frontier in frontiers {
119 frontier.extend(input_frontier.frontier().iter().copied());
120 }
121
122 let should_emit = |time: &Timestamp| {
123 let beyond_as_of = if with_snapshot {
124 as_of.less_equal(time)
125 } else {
126 as_of.less_than(time)
127 };
128 let before_up_to = !up_to.less_equal(time);
129 beyond_as_of && before_up_to
130 };
131
132 ok_input.for_each(|_, data| {
133 for (row, time, diff) in data.drain(..) {
134 if should_emit(&time) {
135 rows_to_emit.push((time, row, diff));
136 }
137 }
138 });
139 err_input.for_each(|_, data| {
140 for (error, time, diff) in data.drain(..) {
141 if should_emit(&time) {
142 errors_to_emit.push((time, error, diff));
143 }
144 }
145 });
146
147 if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut() {
148 subscribe_protocol.send_batch(
149 frontier.clone(),
150 &mut rows_to_emit,
151 &mut errors_to_emit,
152 );
153 }
154
155 if PartialOrder::less_equal(&up_to, &frontier) {
156 finished = true;
157 if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut()
160 {
161 subscribe_protocol.send_batch(
162 Antichain::default(),
163 &mut Vec::new(),
164 &mut Vec::new(),
165 );
166 }
167 }
168 }
169 });
170}
171
172struct SubscribeProtocol {
179 pub sink_id: GlobalId,
180 pub sink_as_of: Antichain<Timestamp>,
181 pub subscribe_response_buffer: Option<Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>>,
182 pub prev_upper: Antichain<Timestamp>,
183 pub output: Vec<ColumnOrder>,
184 pub poison: Option<String>,
190}
191
192impl SubscribeProtocol {
193 fn send_batch(
206 &mut self,
207 upper: Antichain<Timestamp>,
208 rows: &mut Vec<(Timestamp, Row, Diff)>,
209 errors: &mut Vec<(Timestamp, DataflowError, Diff)>,
210 ) {
211 if !PartialOrder::less_equal(&self.sink_as_of, &upper) || upper == self.prev_upper {
215 return;
216 }
217
218 let order = self.output.as_slice();
220 if order.is_empty() {
221 rows.sort_unstable_by(|(t0, r0, _), (t1, r1, _)| {
222 t0.cmp(t1).then_with(|| r0.cmp(r1)).reverse()
224 });
225 } else {
226 let mut left_datum_vec = mz_repr::DatumVec::new();
227 let mut right_datum_vec = mz_repr::DatumVec::new();
228 rows.sort_unstable_by(|(t0, r0, _), (t1, r1, _)| {
229 t0.cmp(t1)
231 .then_with(|| {
232 let dv0 = left_datum_vec.borrow_with(r0.as_row_ref());
233 let dv1 = right_datum_vec.borrow_with(r1.as_row_ref());
234 compare_columns(order, &dv0, &dv1, || r0.cmp(r1))
235 })
236 .reverse()
237 });
238 }
239 consolidate_updates(errors);
240
241 let ship_rows = {
242 let split_at = rows.partition_point(|(t, _, _)| upper.less_equal(t));
245 let ship_updates = rows[split_at..]
246 .iter()
247 .rev()
248 .map(|(t, r, d)| (r.as_row_ref(), t, *d));
249 let len = rows[split_at..].len();
250 let byte_len = len * 32;
253 let mut builder = UpdateCollection::builder(byte_len, len);
254 for update in iter::consolidate_update_iter(ship_updates) {
255 builder.push(update);
256 }
257 rows.truncate(split_at);
258 builder.build()
259 };
260 let (keep_errors, ship_errors) = errors.drain(..).partition(|u| upper.less_equal(&u.0));
261 *errors = keep_errors;
262
263 let updates = match (&self.poison, ship_errors.first()) {
264 (Some(error), _) => {
265 Err(error.clone())
267 }
268 (None, Some((_, error, _))) => {
269 let error = error.to_string();
271 self.poison = Some(error.clone());
272 Err(error)
273 }
274 (None, None) => {
275 Ok(vec![ship_rows])
277 }
278 };
279
280 let buffer = self
281 .subscribe_response_buffer
282 .as_mut()
283 .expect("The subscribe response buffer is only cleared on drop.");
284
285 buffer.borrow_mut().push((
286 self.sink_id,
287 SubscribeResponse::Batch(SubscribeBatch {
288 lower: self.prev_upper.clone(),
289 upper: upper.clone(),
290 updates,
291 }),
292 ));
293
294 let input_exhausted = upper.is_empty();
295 self.prev_upper = upper;
296 if input_exhausted {
297 self.subscribe_response_buffer = None;
300 }
301 }
302}
303
304impl Drop for SubscribeProtocol {
305 fn drop(&mut self) {
306 if let Some(buffer) = self.subscribe_response_buffer.take() {
307 buffer.borrow_mut().push((
308 self.sink_id,
309 SubscribeResponse::DroppedAt(self.prev_upper.clone()),
310 ));
311 }
312 }
313}