1use std::any::Any;
11use std::cell::RefCell;
12use std::ops::DerefMut;
13use std::rc::Rc;
14
15use differential_dataflow::consolidation::consolidate_updates;
16use differential_dataflow::{AsCollection, Collection};
17use mz_compute_client::protocol::response::{SubscribeBatch, SubscribeResponse};
18use mz_compute_types::sinks::{ComputeSinkDesc, SubscribeSinkConnection};
19use mz_repr::{Diff, GlobalId, Row, Timestamp};
20use mz_storage_types::controller::CollectionMetadata;
21use mz_storage_types::errors::DataflowError;
22use mz_timely_util::probe::{Handle, ProbeNotify};
23use timely::PartialOrder;
24use timely::dataflow::Scope;
25use timely::dataflow::channels::pact::Pipeline;
26use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
27use timely::progress::Antichain;
28use timely::progress::timestamp::Timestamp as TimelyTimestamp;
29
30use crate::render::StartSignal;
31use crate::render::sinks::SinkRender;
32
33impl<G> SinkRender<G> for SubscribeSinkConnection
34where
35 G: Scope<Timestamp = Timestamp>,
36{
37 fn render_sink(
38 &self,
39 compute_state: &mut crate::compute_state::ComputeState,
40 sink: &ComputeSinkDesc<CollectionMetadata>,
41 sink_id: GlobalId,
42 as_of: Antichain<Timestamp>,
43 _start_signal: StartSignal,
44 sinked_collection: Collection<G, Row, Diff>,
45 err_collection: Collection<G, DataflowError, Diff>,
46 _ct_times: Option<Collection<G, (), Diff>>,
47 output_probe: &Handle<Timestamp>,
48 ) -> Option<Rc<dyn Any>> {
49 let subscribe_protocol_handle = Rc::new(RefCell::new(Some(SubscribeProtocol {
53 sink_id,
54 sink_as_of: as_of.clone(),
55 subscribe_response_buffer: Some(Rc::clone(&compute_state.subscribe_response_buffer)),
56 prev_upper: Antichain::from_elem(Timestamp::minimum()),
57 poison: None,
58 })));
59 let subscribe_protocol_weak = Rc::downgrade(&subscribe_protocol_handle);
60 let sinked_collection = sinked_collection
61 .inner
62 .probe_notify_with(vec![output_probe.clone()])
63 .as_collection();
64 subscribe(
65 sinked_collection,
66 err_collection,
67 sink_id,
68 sink.with_snapshot,
69 as_of,
70 sink.up_to.clone(),
71 subscribe_protocol_handle,
72 );
73
74 Some(Rc::new(scopeguard::guard((), move |_| {
78 if let Some(subscribe_protocol_handle) = subscribe_protocol_weak.upgrade() {
79 std::mem::drop(subscribe_protocol_handle.borrow_mut().take())
80 }
81 })))
82 }
83}
84
85fn subscribe<G>(
86 sinked_collection: Collection<G, Row, Diff>,
87 err_collection: Collection<G, DataflowError, Diff>,
88 sink_id: GlobalId,
89 with_snapshot: bool,
90 as_of: Antichain<G::Timestamp>,
91 up_to: Antichain<G::Timestamp>,
92 subscribe_protocol_handle: Rc<RefCell<Option<SubscribeProtocol>>>,
93) where
94 G: Scope<Timestamp = Timestamp>,
95{
96 let name = format!("subscribe-{}", sink_id);
97 let mut op = OperatorBuilder::new(name, sinked_collection.scope());
98 let mut ok_input = op.new_input(&sinked_collection.inner, Pipeline);
99 let mut err_input = op.new_input(&err_collection.inner, Pipeline);
100
101 op.build(|_cap| {
102 let mut rows_to_emit = Vec::new();
103 let mut errors_to_emit = Vec::new();
104 let mut finished = false;
105
106 move |frontiers| {
107 if finished {
108 ok_input.for_each(|_, _| {});
110 err_input.for_each(|_, _| {});
111 return;
112 }
113
114 let mut frontier = Antichain::new();
115 for input_frontier in frontiers {
116 frontier.extend(input_frontier.frontier().iter().copied());
117 }
118
119 let should_emit = |time: &Timestamp| {
120 let beyond_as_of = if with_snapshot {
121 as_of.less_equal(time)
122 } else {
123 as_of.less_than(time)
124 };
125 let before_up_to = !up_to.less_equal(time);
126 beyond_as_of && before_up_to
127 };
128
129 ok_input.for_each(|_, data| {
130 for (row, time, diff) in data.drain(..) {
131 if should_emit(&time) {
132 rows_to_emit.push((time, row, diff));
133 }
134 }
135 });
136 err_input.for_each(|_, data| {
137 for (error, time, diff) in data.drain(..) {
138 if should_emit(&time) {
139 errors_to_emit.push((time, error, diff));
140 }
141 }
142 });
143
144 if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut() {
145 subscribe_protocol.send_batch(
146 frontier.clone(),
147 &mut rows_to_emit,
148 &mut errors_to_emit,
149 );
150 }
151
152 if PartialOrder::less_equal(&up_to, &frontier) {
153 finished = true;
154 if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut()
157 {
158 subscribe_protocol.send_batch(
159 Antichain::default(),
160 &mut Vec::new(),
161 &mut Vec::new(),
162 );
163 }
164 }
165 }
166 });
167}
168
169struct SubscribeProtocol {
176 pub sink_id: GlobalId,
177 pub sink_as_of: Antichain<Timestamp>,
178 pub subscribe_response_buffer: Option<Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>>,
179 pub prev_upper: Antichain<Timestamp>,
180 pub poison: Option<String>,
186}
187
188impl SubscribeProtocol {
189 fn send_batch(
202 &mut self,
203 upper: Antichain<Timestamp>,
204 rows: &mut Vec<(Timestamp, Row, Diff)>,
205 errors: &mut Vec<(Timestamp, DataflowError, Diff)>,
206 ) {
207 if !PartialOrder::less_equal(&self.sink_as_of, &upper) || upper == self.prev_upper {
211 return;
212 }
213
214 consolidate_updates(rows);
216 consolidate_updates(errors);
217
218 let (keep_rows, ship_rows) = rows.drain(..).partition(|u| upper.less_equal(&u.0));
219 let (keep_errors, ship_errors) = errors.drain(..).partition(|u| upper.less_equal(&u.0));
220 *rows = keep_rows;
221 *errors = keep_errors;
222
223 let updates = match (&self.poison, ship_errors.first()) {
224 (Some(error), _) => {
225 Err(error.clone())
227 }
228 (None, Some((_, error, _))) => {
229 let error = error.to_string();
231 self.poison = Some(error.clone());
232 Err(error)
233 }
234 (None, None) => {
235 Ok(ship_rows)
237 }
238 };
239
240 let buffer = self
241 .subscribe_response_buffer
242 .as_mut()
243 .expect("The subscribe response buffer is only cleared on drop.");
244
245 buffer.borrow_mut().push((
246 self.sink_id,
247 SubscribeResponse::Batch(SubscribeBatch {
248 lower: self.prev_upper.clone(),
249 upper: upper.clone(),
250 updates,
251 }),
252 ));
253
254 let input_exhausted = upper.is_empty();
255 self.prev_upper = upper;
256 if input_exhausted {
257 self.subscribe_response_buffer = None;
260 }
261 }
262}
263
264impl Drop for SubscribeProtocol {
265 fn drop(&mut self) {
266 if let Some(buffer) = self.subscribe_response_buffer.take() {
267 buffer.borrow_mut().push((
268 self.sink_id,
269 SubscribeResponse::DroppedAt(self.prev_upper.clone()),
270 ));
271 }
272 }
273}