1use std::any::Any;
11use std::cell::RefCell;
12use std::ops::DerefMut;
13use std::rc::Rc;
14
15use differential_dataflow::consolidation::consolidate_updates;
16use differential_dataflow::{AsCollection, VecCollection};
17use mz_compute_client::protocol::response::{SubscribeBatch, SubscribeResponse};
18use mz_compute_types::sinks::{ComputeSinkDesc, SubscribeSinkConnection};
19use mz_expr::{ColumnOrder, compare_columns};
20use mz_ore::iter;
21use mz_repr::{Diff, GlobalId, Row, Timestamp, UpdateCollection};
22use mz_storage_types::controller::CollectionMetadata;
23use mz_storage_types::errors::DataflowError;
24use mz_timely_util::probe::{Handle, ProbeNotify};
25use timely::PartialOrder;
26use timely::dataflow::channels::pact::Pipeline;
27use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
28use timely::progress::Antichain;
29use timely::progress::timestamp::Timestamp as TimelyTimestamp;
30
31use crate::render::StartSignal;
32use crate::render::sinks::SinkRender;
33
34impl<'scope> SinkRender<'scope> for SubscribeSinkConnection {
35 fn render_sink(
36 &self,
37 compute_state: &mut crate::compute_state::ComputeState,
38 sink: &ComputeSinkDesc<CollectionMetadata>,
39 sink_id: GlobalId,
40 as_of: Antichain<Timestamp>,
41 _start_signal: StartSignal,
42 sinked_collection: VecCollection<'scope, Timestamp, Row, Diff>,
43 err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
44 _ct_times: Option<VecCollection<'scope, Timestamp, (), Diff>>,
45 output_probe: &Handle<Timestamp>,
46 ) -> Option<Rc<dyn Any>> {
47 let subscribe_protocol_handle = Rc::new(RefCell::new(Some(SubscribeProtocol {
51 sink_id,
52 sink_as_of: as_of.clone(),
53 subscribe_response_buffer: Some(Rc::clone(&compute_state.subscribe_response_buffer)),
54 prev_upper: Antichain::from_elem(Timestamp::minimum()),
55 output: self.output.clone(),
56 poison: None,
57 })));
58 let subscribe_protocol_weak = Rc::downgrade(&subscribe_protocol_handle);
59 let sinked_collection = sinked_collection
60 .inner
61 .probe_notify_with(vec![output_probe.clone()])
62 .as_collection();
63 subscribe(
64 sinked_collection,
65 err_collection,
66 sink_id,
67 sink.with_snapshot,
68 as_of,
69 sink.up_to.clone(),
70 subscribe_protocol_handle,
71 );
72
73 Some(Rc::new(scopeguard::guard((), move |_| {
77 if let Some(subscribe_protocol_handle) = subscribe_protocol_weak.upgrade() {
78 std::mem::drop(subscribe_protocol_handle.borrow_mut().take())
79 }
80 })))
81 }
82}
83
84fn subscribe<'scope>(
85 sinked_collection: VecCollection<'scope, Timestamp, Row, Diff>,
86 err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
87 sink_id: GlobalId,
88 with_snapshot: bool,
89 as_of: Antichain<Timestamp>,
90 up_to: Antichain<Timestamp>,
91 subscribe_protocol_handle: Rc<RefCell<Option<SubscribeProtocol>>>,
92) {
93 let name = format!("subscribe-{}", sink_id);
94 let mut op = OperatorBuilder::new(name, sinked_collection.scope());
95 let mut ok_input = op.new_input(sinked_collection.inner, Pipeline);
96 let mut err_input = op.new_input(err_collection.inner, Pipeline);
97
98 op.build(|_cap| {
99 let mut rows_to_emit = Vec::new();
100 let mut errors_to_emit = Vec::new();
101 let mut finished = false;
102
103 move |frontiers| {
104 if finished {
105 ok_input.for_each(|_, _| {});
107 err_input.for_each(|_, _| {});
108 return;
109 }
110
111 let mut frontier = Antichain::new();
112 for input_frontier in frontiers {
113 frontier.extend(input_frontier.frontier().iter().copied());
114 }
115
116 let should_emit = |time: &Timestamp| {
117 let beyond_as_of = if with_snapshot {
118 as_of.less_equal(time)
119 } else {
120 as_of.less_than(time)
121 };
122 let before_up_to = !up_to.less_equal(time);
123 beyond_as_of && before_up_to
124 };
125
126 ok_input.for_each(|_, data| {
127 for (row, time, diff) in data.drain(..) {
128 if should_emit(&time) {
129 rows_to_emit.push((time, row, diff));
130 }
131 }
132 });
133 err_input.for_each(|_, data| {
134 for (error, time, diff) in data.drain(..) {
135 if should_emit(&time) {
136 errors_to_emit.push((time, error, diff));
137 }
138 }
139 });
140
141 if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut() {
142 subscribe_protocol.send_batch(
143 frontier.clone(),
144 &mut rows_to_emit,
145 &mut errors_to_emit,
146 );
147 }
148
149 if PartialOrder::less_equal(&up_to, &frontier) {
150 finished = true;
151 if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut()
154 {
155 subscribe_protocol.send_batch(
156 Antichain::default(),
157 &mut Vec::new(),
158 &mut Vec::new(),
159 );
160 }
161 }
162 }
163 });
164}
165
166struct SubscribeProtocol {
173 pub sink_id: GlobalId,
174 pub sink_as_of: Antichain<Timestamp>,
175 pub subscribe_response_buffer: Option<Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>>,
176 pub prev_upper: Antichain<Timestamp>,
177 pub output: Vec<ColumnOrder>,
178 pub poison: Option<String>,
184}
185
186impl SubscribeProtocol {
187 fn send_batch(
200 &mut self,
201 upper: Antichain<Timestamp>,
202 rows: &mut Vec<(Timestamp, Row, Diff)>,
203 errors: &mut Vec<(Timestamp, DataflowError, Diff)>,
204 ) {
205 if !PartialOrder::less_equal(&self.sink_as_of, &upper) || upper == self.prev_upper {
209 return;
210 }
211
212 let order = self.output.as_slice();
214 if order.is_empty() {
215 rows.sort_unstable_by(|(t0, r0, _), (t1, r1, _)| {
216 t0.cmp(t1).then_with(|| r0.cmp(r1)).reverse()
218 });
219 } else {
220 let mut left_datum_vec = mz_repr::DatumVec::new();
221 let mut right_datum_vec = mz_repr::DatumVec::new();
222 rows.sort_unstable_by(|(t0, r0, _), (t1, r1, _)| {
223 t0.cmp(t1)
225 .then_with(|| {
226 let dv0 = left_datum_vec.borrow_with(r0.as_row_ref());
227 let dv1 = right_datum_vec.borrow_with(r1.as_row_ref());
228 compare_columns(order, &dv0, &dv1, || r0.cmp(r1))
229 })
230 .reverse()
231 });
232 }
233 consolidate_updates(errors);
234
235 let ship_rows = {
236 let split_at = rows.partition_point(|(t, _, _)| upper.less_equal(t));
239 let ship_updates = rows[split_at..]
240 .iter()
241 .rev()
242 .map(|(t, r, d)| (r.as_row_ref(), t, *d));
243 let len = rows[split_at..].len();
244 let byte_len = len * 32;
247 let mut builder = UpdateCollection::builder(byte_len, len);
248 for update in iter::consolidate_update_iter(ship_updates) {
249 builder.push(update);
250 }
251 rows.truncate(split_at);
252 builder.build()
253 };
254 let (keep_errors, ship_errors) = errors.drain(..).partition(|u| upper.less_equal(&u.0));
255 *errors = keep_errors;
256
257 let updates = match (&self.poison, ship_errors.first()) {
258 (Some(error), _) => {
259 Err(error.clone())
261 }
262 (None, Some((_, error, _))) => {
263 let error = error.to_string();
265 self.poison = Some(error.clone());
266 Err(error)
267 }
268 (None, None) => {
269 Ok(vec![ship_rows])
271 }
272 };
273
274 let buffer = self
275 .subscribe_response_buffer
276 .as_mut()
277 .expect("The subscribe response buffer is only cleared on drop.");
278
279 buffer.borrow_mut().push((
280 self.sink_id,
281 SubscribeResponse::Batch(SubscribeBatch {
282 lower: self.prev_upper.clone(),
283 upper: upper.clone(),
284 updates,
285 }),
286 ));
287
288 let input_exhausted = upper.is_empty();
289 self.prev_upper = upper;
290 if input_exhausted {
291 self.subscribe_response_buffer = None;
294 }
295 }
296}
297
298impl Drop for SubscribeProtocol {
299 fn drop(&mut self) {
300 if let Some(buffer) = self.subscribe_response_buffer.take() {
301 buffer.borrow_mut().push((
302 self.sink_id,
303 SubscribeResponse::DroppedAt(self.prev_upper.clone()),
304 ));
305 }
306 }
307}