Skip to main content

mz_compute/sink/
subscribe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::any::Any;
11use std::cell::RefCell;
12use std::ops::DerefMut;
13use std::rc::Rc;
14
15use differential_dataflow::consolidation::consolidate_updates;
16use differential_dataflow::{AsCollection, VecCollection};
17use mz_compute_client::protocol::response::{SubscribeBatch, SubscribeResponse};
18use mz_compute_types::sinks::{ComputeSinkDesc, SubscribeSinkConnection};
19use mz_expr::{ColumnOrder, compare_columns};
20use mz_ore::iter;
21use mz_repr::{Diff, GlobalId, Row, Timestamp, UpdateCollection};
22use mz_storage_types::controller::CollectionMetadata;
23use mz_storage_types::errors::DataflowError;
24use mz_timely_util::probe::{Handle, ProbeNotify};
25use timely::PartialOrder;
26use timely::dataflow::channels::pact::Pipeline;
27use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
28use timely::progress::Antichain;
29use timely::progress::timestamp::Timestamp as TimelyTimestamp;
30
31use crate::render::StartSignal;
32use crate::render::sinks::SinkRender;
33
34impl<'scope> SinkRender<'scope> for SubscribeSinkConnection {
35    fn render_sink(
36        &self,
37        compute_state: &mut crate::compute_state::ComputeState,
38        sink: &ComputeSinkDesc<CollectionMetadata>,
39        sink_id: GlobalId,
40        as_of: Antichain<Timestamp>,
41        _start_signal: StartSignal,
42        sinked_collection: VecCollection<'scope, Timestamp, Row, Diff>,
43        err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
44        _ct_times: Option<VecCollection<'scope, Timestamp, (), Diff>>,
45        output_probe: &Handle<Timestamp>,
46    ) -> Option<Rc<dyn Any>> {
47        // An encapsulation of the Subscribe response protocol.
48        // Used to send rows and progress messages,
49        // and alert if the dataflow was dropped before completing.
50        let subscribe_protocol_handle = Rc::new(RefCell::new(Some(SubscribeProtocol {
51            sink_id,
52            sink_as_of: as_of.clone(),
53            subscribe_response_buffer: Some(Rc::clone(&compute_state.subscribe_response_buffer)),
54            prev_upper: Antichain::from_elem(Timestamp::minimum()),
55            output: self.output.clone(),
56            poison: None,
57        })));
58        let subscribe_protocol_weak = Rc::downgrade(&subscribe_protocol_handle);
59        let sinked_collection = sinked_collection
60            .inner
61            .probe_notify_with(vec![output_probe.clone()])
62            .as_collection();
63        subscribe(
64            sinked_collection,
65            err_collection,
66            sink_id,
67            sink.with_snapshot,
68            as_of,
69            sink.up_to.clone(),
70            subscribe_protocol_handle,
71        );
72
73        // Inform the coordinator that we have been dropped,
74        // and destroy the subscribe protocol so the sink operator
75        // can't send spurious messages while shutting down.
76        Some(Rc::new(scopeguard::guard((), move |_| {
77            if let Some(subscribe_protocol_handle) = subscribe_protocol_weak.upgrade() {
78                std::mem::drop(subscribe_protocol_handle.borrow_mut().take())
79            }
80        })))
81    }
82}
83
84fn subscribe<'scope>(
85    sinked_collection: VecCollection<'scope, Timestamp, Row, Diff>,
86    err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
87    sink_id: GlobalId,
88    with_snapshot: bool,
89    as_of: Antichain<Timestamp>,
90    up_to: Antichain<Timestamp>,
91    subscribe_protocol_handle: Rc<RefCell<Option<SubscribeProtocol>>>,
92) {
93    let name = format!("subscribe-{}", sink_id);
94    let mut op = OperatorBuilder::new(name, sinked_collection.scope());
95    let mut ok_input = op.new_input(sinked_collection.inner, Pipeline);
96    let mut err_input = op.new_input(err_collection.inner, Pipeline);
97
98    op.build(|_cap| {
99        let mut rows_to_emit = Vec::new();
100        let mut errors_to_emit = Vec::new();
101        let mut finished = false;
102
103        move |frontiers| {
104            if finished {
105                // Drain the inputs, to avoid the operator being constantly rescheduled
106                ok_input.for_each(|_, _| {});
107                err_input.for_each(|_, _| {});
108                return;
109            }
110
111            let mut frontier = Antichain::new();
112            for input_frontier in frontiers {
113                frontier.extend(input_frontier.frontier().iter().copied());
114            }
115
116            let should_emit = |time: &Timestamp| {
117                let beyond_as_of = if with_snapshot {
118                    as_of.less_equal(time)
119                } else {
120                    as_of.less_than(time)
121                };
122                let before_up_to = !up_to.less_equal(time);
123                beyond_as_of && before_up_to
124            };
125
126            ok_input.for_each(|_, data| {
127                for (row, time, diff) in data.drain(..) {
128                    if should_emit(&time) {
129                        rows_to_emit.push((time, row, diff));
130                    }
131                }
132            });
133            err_input.for_each(|_, data| {
134                for (error, time, diff) in data.drain(..) {
135                    if should_emit(&time) {
136                        errors_to_emit.push((time, error, diff));
137                    }
138                }
139            });
140
141            if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut() {
142                subscribe_protocol.send_batch(
143                    frontier.clone(),
144                    &mut rows_to_emit,
145                    &mut errors_to_emit,
146                );
147            }
148
149            if PartialOrder::less_equal(&up_to, &frontier) {
150                finished = true;
151                // We are done; indicate this by sending a batch at the
152                // empty frontier.
153                if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut()
154                {
155                    subscribe_protocol.send_batch(
156                        Antichain::default(),
157                        &mut Vec::new(),
158                        &mut Vec::new(),
159                    );
160                }
161            }
162        }
163    });
164}
165
166/// A type that guides the transmission of rows back to the coordinator.
167///
168/// A protocol instance may `send` rows indefinitely in response to `send_batch` calls.
169/// A `send_batch` call advancing the upper to the empty frontier is used to indicate the end of
170/// a stream. If the stream is not advanced to the empty frontier, the `Drop` implementation sends
171/// an indication that the protocol has finished without completion.
172struct SubscribeProtocol {
173    pub sink_id: GlobalId,
174    pub sink_as_of: Antichain<Timestamp>,
175    pub subscribe_response_buffer: Option<Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>>,
176    pub prev_upper: Antichain<Timestamp>,
177    pub output: Vec<ColumnOrder>,
178    /// The error poisoning this subscribe, if any.
179    ///
180    /// As soon as a subscribe has encountered an error, it is poisoned: It will only return the
181    /// same error in subsequent batches, until it is dropped. The subscribe protocol currently
182    /// does not support retracting errors (database-issues#5182).
183    pub poison: Option<String>,
184}
185
186impl SubscribeProtocol {
187    /// Attempt to send a batch of rows with the given `upper`.
188    ///
189    /// This method filters the updates to send based on the provided `upper`. Updates are only
190    /// sent when their times are before `upper`. If `upper` has not advanced far enough, no batch
191    /// will be sent. `rows` and `errors` that have been sent are drained from their respective
192    /// vectors, only entries that have not been sent remain after the call returns. The caller is
193    /// expected to re-submit these entries, potentially along with new ones, at a later `upper`.
194    ///
195    /// The subscribe protocol only supports reporting a single error. Because of this, it will
196    /// only actually send the first error received in a `SubscribeResponse`. Subsequent errors are
197    /// dropped. To simplify life for the caller, this method still maintains the illusion that
198    /// `errors` are handled the same way as `rows`.
199    fn send_batch(
200        &mut self,
201        upper: Antichain<Timestamp>,
202        rows: &mut Vec<(Timestamp, Row, Diff)>,
203        errors: &mut Vec<(Timestamp, DataflowError, Diff)>,
204    ) {
205        // Only send a batch if both conditions hold:
206        //  a) `upper` has reached or passed the sink's `as_of` frontier.
207        //  b) `upper` is different from when we last sent a batch.
208        if !PartialOrder::less_equal(&self.sink_as_of, &upper) || upper == self.prev_upper {
209            return;
210        }
211
212        // The compute protocol requires us to only send out consolidated batches.
213        let order = self.output.as_slice();
214        if order.is_empty() {
215            rows.sort_unstable_by(|(t0, r0, _), (t1, r1, _)| {
216                // NB: sort in reverse, so it's cheaper to peek off the tail.
217                t0.cmp(t1).then_with(|| r0.cmp(r1)).reverse()
218            });
219        } else {
220            let mut left_datum_vec = mz_repr::DatumVec::new();
221            let mut right_datum_vec = mz_repr::DatumVec::new();
222            rows.sort_unstable_by(|(t0, r0, _), (t1, r1, _)| {
223                // NB: sort in reverse, so it's cheaper to peek off the tail.
224                t0.cmp(t1)
225                    .then_with(|| {
226                        let dv0 = left_datum_vec.borrow_with(r0.as_row_ref());
227                        let dv1 = right_datum_vec.borrow_with(r1.as_row_ref());
228                        compare_columns(order, &dv0, &dv1, || r0.cmp(r1))
229                    })
230                    .reverse()
231            });
232        }
233        consolidate_updates(errors);
234
235        let ship_rows = {
236            // Chop of the tail of the reverse-sorted buffer (ie. the prefix we care about) and ship
237            // it, preserving the rest of the values for future iterations.
238            let split_at = rows.partition_point(|(t, _, _)| upper.less_equal(t));
239            let ship_updates = rows[split_at..]
240                .iter()
241                .rev()
242                .map(|(t, r, d)| (r.as_row_ref(), t, *d));
243            let len = rows[split_at..].len();
244            // We can't estimate the total size of the consolidated bytes exactly without extra work,
245            // so for now we initialize to the length times a small constant factor.
246            let byte_len = len * 32;
247            let mut builder = UpdateCollection::builder(byte_len, len);
248            for update in iter::consolidate_update_iter(ship_updates) {
249                builder.push(update);
250            }
251            rows.truncate(split_at);
252            builder.build()
253        };
254        let (keep_errors, ship_errors) = errors.drain(..).partition(|u| upper.less_equal(&u.0));
255        *errors = keep_errors;
256
257        let updates = match (&self.poison, ship_errors.first()) {
258            (Some(error), _) => {
259                // The subscribe is poisoned; keep sending the same error.
260                Err(error.clone())
261            }
262            (None, Some((_, error, _))) => {
263                // The subscribe encountered its first error; poison it.
264                let error = error.to_string();
265                self.poison = Some(error.clone());
266                Err(error)
267            }
268            (None, None) => {
269                // No error encountered so for; ship the rows we have!
270                Ok(vec![ship_rows])
271            }
272        };
273
274        let buffer = self
275            .subscribe_response_buffer
276            .as_mut()
277            .expect("The subscribe response buffer is only cleared on drop.");
278
279        buffer.borrow_mut().push((
280            self.sink_id,
281            SubscribeResponse::Batch(SubscribeBatch {
282                lower: self.prev_upper.clone(),
283                upper: upper.clone(),
284                updates,
285            }),
286        ));
287
288        let input_exhausted = upper.is_empty();
289        self.prev_upper = upper;
290        if input_exhausted {
291            // The dataflow's input has been exhausted; clear the channel,
292            // to avoid sending `SubscribeResponse::DroppedAt`.
293            self.subscribe_response_buffer = None;
294        }
295    }
296}
297
298impl Drop for SubscribeProtocol {
299    fn drop(&mut self) {
300        if let Some(buffer) = self.subscribe_response_buffer.take() {
301            buffer.borrow_mut().push((
302                self.sink_id,
303                SubscribeResponse::DroppedAt(self.prev_upper.clone()),
304            ));
305        }
306    }
307}