Skip to main content

mz_compute/sink/
subscribe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::any::Any;
11use std::cell::RefCell;
12use std::ops::DerefMut;
13use std::rc::Rc;
14
15use differential_dataflow::consolidation::consolidate_updates;
16use differential_dataflow::{AsCollection, VecCollection};
17use mz_compute_client::protocol::response::{SubscribeBatch, SubscribeResponse};
18use mz_compute_types::sinks::{ComputeSinkDesc, SubscribeSinkConnection};
19use mz_expr::{ColumnOrder, compare_columns};
20use mz_ore::iter;
21use mz_repr::{Diff, GlobalId, Row, Timestamp, UpdateCollection};
22use mz_storage_types::controller::CollectionMetadata;
23use mz_timely_util::probe::{Handle, ProbeNotify};
24use timely::PartialOrder;
25use timely::dataflow::channels::pact::Pipeline;
26use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
27use timely::progress::Antichain;
28use timely::progress::timestamp::Timestamp as TimelyTimestamp;
29
30use crate::render::StartSignal;
31use crate::render::errors::DataflowErrorSer;
32use crate::render::sinks::SinkRender;
33
34impl<'scope> SinkRender<'scope> for SubscribeSinkConnection {
35    fn render_sink(
36        &self,
37        compute_state: &mut crate::compute_state::ComputeState,
38        sink: &ComputeSinkDesc<CollectionMetadata>,
39        sink_id: GlobalId,
40        as_of: Antichain<Timestamp>,
41        _start_signal: StartSignal,
42        sinked_collection: VecCollection<'scope, Timestamp, Row, Diff>,
43        err_collection: VecCollection<'scope, Timestamp, DataflowErrorSer, Diff>,
44        output_probe: &Handle<Timestamp>,
45    ) -> Option<Rc<dyn Any>> {
46        // An encapsulation of the Subscribe response protocol.
47        // Used to send rows and progress messages,
48        // and alert if the dataflow was dropped before completing.
49        let subscribe_protocol_handle = Rc::new(RefCell::new(Some(SubscribeProtocol {
50            sink_id,
51            sink_as_of: as_of.clone(),
52            subscribe_response_buffer: Some(Rc::clone(&compute_state.subscribe_response_buffer)),
53            prev_upper: Antichain::from_elem(Timestamp::minimum()),
54            output: self.output.clone(),
55            poison: None,
56        })));
57        let subscribe_protocol_weak = Rc::downgrade(&subscribe_protocol_handle);
58        let sinked_collection = sinked_collection
59            .inner
60            .probe_notify_with(vec![output_probe.clone()])
61            .as_collection();
62        subscribe(
63            sinked_collection,
64            err_collection,
65            sink_id,
66            sink.with_snapshot,
67            as_of,
68            sink.up_to.clone(),
69            subscribe_protocol_handle,
70        );
71
72        // Inform the coordinator that we have been dropped,
73        // and destroy the subscribe protocol so the sink operator
74        // can't send spurious messages while shutting down.
75        Some(Rc::new(scopeguard::guard((), move |_| {
76            if let Some(subscribe_protocol_handle) = subscribe_protocol_weak.upgrade() {
77                std::mem::drop(subscribe_protocol_handle.borrow_mut().take())
78            }
79        })))
80    }
81}
82
83fn subscribe<'scope>(
84    sinked_collection: VecCollection<'scope, Timestamp, Row, Diff>,
85    err_collection: VecCollection<'scope, Timestamp, DataflowErrorSer, Diff>,
86    sink_id: GlobalId,
87    with_snapshot: bool,
88    as_of: Antichain<Timestamp>,
89    up_to: Antichain<Timestamp>,
90    subscribe_protocol_handle: Rc<RefCell<Option<SubscribeProtocol>>>,
91) {
92    let name = format!("subscribe-{}", sink_id);
93    let mut op = OperatorBuilder::new(name, sinked_collection.scope());
94    let mut ok_input = op.new_input(sinked_collection.inner, Pipeline);
95    let mut err_input = op.new_input(err_collection.inner, Pipeline);
96
97    op.build(|_cap| {
98        let mut rows_to_emit = Vec::new();
99        let mut errors_to_emit = Vec::new();
100        let mut finished = false;
101
102        move |frontiers| {
103            if finished {
104                // Drain the inputs, to avoid the operator being constantly rescheduled
105                ok_input.for_each(|_, _| {});
106                err_input.for_each(|_, _| {});
107                return;
108            }
109
110            let mut frontier = Antichain::new();
111            for input_frontier in frontiers {
112                frontier.extend(input_frontier.frontier().iter().copied());
113            }
114
115            let should_emit = |time: &Timestamp| {
116                let beyond_as_of = if with_snapshot {
117                    as_of.less_equal(time)
118                } else {
119                    as_of.less_than(time)
120                };
121                let before_up_to = !up_to.less_equal(time);
122                beyond_as_of && before_up_to
123            };
124
125            ok_input.for_each(|_, data| {
126                for (row, time, diff) in data.drain(..) {
127                    if should_emit(&time) {
128                        rows_to_emit.push((time, row, diff));
129                    }
130                }
131            });
132            err_input.for_each(|_, data| {
133                for (error, time, diff) in data.drain(..) {
134                    if should_emit(&time) {
135                        errors_to_emit.push((time, error, diff));
136                    }
137                }
138            });
139
140            if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut() {
141                subscribe_protocol.send_batch(
142                    frontier.clone(),
143                    &mut rows_to_emit,
144                    &mut errors_to_emit,
145                );
146            }
147
148            if PartialOrder::less_equal(&up_to, &frontier) {
149                finished = true;
150                // We are done; indicate this by sending a batch at the
151                // empty frontier.
152                if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut()
153                {
154                    subscribe_protocol.send_batch(
155                        Antichain::default(),
156                        &mut Vec::new(),
157                        &mut Vec::new(),
158                    );
159                }
160            }
161        }
162    });
163}
164
165/// A type that guides the transmission of rows back to the coordinator.
166///
167/// A protocol instance may `send` rows indefinitely in response to `send_batch` calls.
168/// A `send_batch` call advancing the upper to the empty frontier is used to indicate the end of
169/// a stream. If the stream is not advanced to the empty frontier, the `Drop` implementation sends
170/// an indication that the protocol has finished without completion.
171struct SubscribeProtocol {
172    pub sink_id: GlobalId,
173    pub sink_as_of: Antichain<Timestamp>,
174    pub subscribe_response_buffer: Option<Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>>,
175    pub prev_upper: Antichain<Timestamp>,
176    pub output: Vec<ColumnOrder>,
177    /// The error poisoning this subscribe, if any.
178    ///
179    /// As soon as a subscribe has encountered an error, it is poisoned: It will only return the
180    /// same error in subsequent batches, until it is dropped. The subscribe protocol currently
181    /// does not support retracting errors (database-issues#5182).
182    pub poison: Option<String>,
183}
184
185impl SubscribeProtocol {
186    /// Attempt to send a batch of rows with the given `upper`.
187    ///
188    /// This method filters the updates to send based on the provided `upper`. Updates are only
189    /// sent when their times are before `upper`. If `upper` has not advanced far enough, no batch
190    /// will be sent. `rows` and `errors` that have been sent are drained from their respective
191    /// vectors, only entries that have not been sent remain after the call returns. The caller is
192    /// expected to re-submit these entries, potentially along with new ones, at a later `upper`.
193    ///
194    /// The subscribe protocol only supports reporting a single error. Because of this, it will
195    /// only actually send the first error received in a `SubscribeResponse`. Subsequent errors are
196    /// dropped. To simplify life for the caller, this method still maintains the illusion that
197    /// `errors` are handled the same way as `rows`.
198    fn send_batch(
199        &mut self,
200        upper: Antichain<Timestamp>,
201        rows: &mut Vec<(Timestamp, Row, Diff)>,
202        errors: &mut Vec<(Timestamp, DataflowErrorSer, Diff)>,
203    ) {
204        // Only send a batch if both conditions hold:
205        //  a) `upper` has reached or passed the sink's `as_of` frontier.
206        //  b) `upper` is different from when we last sent a batch.
207        if !PartialOrder::less_equal(&self.sink_as_of, &upper) || upper == self.prev_upper {
208            return;
209        }
210
211        // The compute protocol requires us to only send out consolidated batches.
212        let order = self.output.as_slice();
213        if order.is_empty() {
214            rows.sort_unstable_by(|(t0, r0, _), (t1, r1, _)| {
215                // NB: sort in reverse, so it's cheaper to peek off the tail.
216                t0.cmp(t1).then_with(|| r0.cmp(r1)).reverse()
217            });
218        } else {
219            let mut left_datum_vec = mz_repr::DatumVec::new();
220            let mut right_datum_vec = mz_repr::DatumVec::new();
221            rows.sort_unstable_by(|(t0, r0, _), (t1, r1, _)| {
222                // NB: sort in reverse, so it's cheaper to peek off the tail.
223                t0.cmp(t1)
224                    .then_with(|| {
225                        let dv0 = left_datum_vec.borrow_with(r0.as_row_ref());
226                        let dv1 = right_datum_vec.borrow_with(r1.as_row_ref());
227                        compare_columns(order, &dv0, &dv1, || r0.cmp(r1))
228                    })
229                    .reverse()
230            });
231        }
232        consolidate_updates(errors);
233
234        let ship_rows = {
235            // Chop of the tail of the reverse-sorted buffer (ie. the prefix we care about) and ship
236            // it, preserving the rest of the values for future iterations.
237            let split_at = rows.partition_point(|(t, _, _)| upper.less_equal(t));
238            let ship_updates = rows[split_at..]
239                .iter()
240                .rev()
241                .map(|(t, r, d)| (r.as_row_ref(), t, *d));
242            let len = rows[split_at..].len();
243            // We can't estimate the total size of the consolidated bytes exactly without extra work,
244            // so for now we initialize to the length times a small constant factor.
245            let byte_len = len * 32;
246            let mut builder = UpdateCollection::builder(byte_len, len);
247            for update in iter::consolidate_update_iter(ship_updates) {
248                builder.push(update);
249            }
250            rows.truncate(split_at);
251            builder.build()
252        };
253        let (keep_errors, ship_errors) = errors.drain(..).partition(|u| upper.less_equal(&u.0));
254        *errors = keep_errors;
255
256        let updates = match (&self.poison, ship_errors.first()) {
257            (Some(error), _) => {
258                // The subscribe is poisoned; keep sending the same error.
259                Err(error.clone())
260            }
261            (None, Some((_, error, _))) => {
262                // The subscribe encountered its first error; poison it.
263                let error = error.to_string();
264                self.poison = Some(error.clone());
265                Err(error)
266            }
267            (None, None) => {
268                // No error encountered so for; ship the rows we have!
269                Ok(vec![ship_rows])
270            }
271        };
272
273        let buffer = self
274            .subscribe_response_buffer
275            .as_mut()
276            .expect("The subscribe response buffer is only cleared on drop.");
277
278        buffer.borrow_mut().push((
279            self.sink_id,
280            SubscribeResponse::Batch(SubscribeBatch {
281                lower: self.prev_upper.clone(),
282                upper: upper.clone(),
283                updates,
284            }),
285        ));
286
287        let input_exhausted = upper.is_empty();
288        self.prev_upper = upper;
289        if input_exhausted {
290            // The dataflow's input has been exhausted; clear the channel,
291            // to avoid sending `SubscribeResponse::DroppedAt`.
292            self.subscribe_response_buffer = None;
293        }
294    }
295}
296
297impl Drop for SubscribeProtocol {
298    fn drop(&mut self) {
299        if let Some(buffer) = self.subscribe_response_buffer.take() {
300            buffer.borrow_mut().push((
301                self.sink_id,
302                SubscribeResponse::DroppedAt(self.prev_upper.clone()),
303            ));
304        }
305    }
306}