Skip to main content

mz_compute/sink/
subscribe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::any::Any;
11use std::cell::RefCell;
12use std::ops::DerefMut;
13use std::rc::Rc;
14
15use differential_dataflow::consolidation::consolidate_updates;
16use differential_dataflow::{AsCollection, VecCollection};
17use mz_compute_client::protocol::response::{SubscribeBatch, SubscribeResponse};
18use mz_compute_types::sinks::{ComputeSinkDesc, SubscribeSinkConnection};
19use mz_expr::{ColumnOrder, compare_columns};
20use mz_ore::iter;
21use mz_repr::{Diff, GlobalId, Row, Timestamp, UpdateCollection};
22use mz_storage_types::controller::CollectionMetadata;
23use mz_storage_types::errors::DataflowError;
24use mz_timely_util::probe::{Handle, ProbeNotify};
25use timely::PartialOrder;
26use timely::dataflow::Scope;
27use timely::dataflow::channels::pact::Pipeline;
28use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
29use timely::progress::Antichain;
30use timely::progress::timestamp::Timestamp as TimelyTimestamp;
31
32use crate::render::StartSignal;
33use crate::render::sinks::SinkRender;
34
35impl<G> SinkRender<G> for SubscribeSinkConnection
36where
37    G: Scope<Timestamp = Timestamp>,
38{
39    fn render_sink(
40        &self,
41        compute_state: &mut crate::compute_state::ComputeState,
42        sink: &ComputeSinkDesc<CollectionMetadata>,
43        sink_id: GlobalId,
44        as_of: Antichain<Timestamp>,
45        _start_signal: StartSignal,
46        sinked_collection: VecCollection<G, Row, Diff>,
47        err_collection: VecCollection<G, DataflowError, Diff>,
48        _ct_times: Option<VecCollection<G, (), Diff>>,
49        output_probe: &Handle<Timestamp>,
50    ) -> Option<Rc<dyn Any>> {
51        // An encapsulation of the Subscribe response protocol.
52        // Used to send rows and progress messages,
53        // and alert if the dataflow was dropped before completing.
54        let subscribe_protocol_handle = Rc::new(RefCell::new(Some(SubscribeProtocol {
55            sink_id,
56            sink_as_of: as_of.clone(),
57            subscribe_response_buffer: Some(Rc::clone(&compute_state.subscribe_response_buffer)),
58            prev_upper: Antichain::from_elem(Timestamp::minimum()),
59            output: self.output.clone(),
60            poison: None,
61        })));
62        let subscribe_protocol_weak = Rc::downgrade(&subscribe_protocol_handle);
63        let sinked_collection = sinked_collection
64            .inner
65            .probe_notify_with(vec![output_probe.clone()])
66            .as_collection();
67        subscribe(
68            sinked_collection,
69            err_collection,
70            sink_id,
71            sink.with_snapshot,
72            as_of,
73            sink.up_to.clone(),
74            subscribe_protocol_handle,
75        );
76
77        // Inform the coordinator that we have been dropped,
78        // and destroy the subscribe protocol so the sink operator
79        // can't send spurious messages while shutting down.
80        Some(Rc::new(scopeguard::guard((), move |_| {
81            if let Some(subscribe_protocol_handle) = subscribe_protocol_weak.upgrade() {
82                std::mem::drop(subscribe_protocol_handle.borrow_mut().take())
83            }
84        })))
85    }
86}
87
88fn subscribe<G>(
89    sinked_collection: VecCollection<G, Row, Diff>,
90    err_collection: VecCollection<G, DataflowError, Diff>,
91    sink_id: GlobalId,
92    with_snapshot: bool,
93    as_of: Antichain<G::Timestamp>,
94    up_to: Antichain<G::Timestamp>,
95    subscribe_protocol_handle: Rc<RefCell<Option<SubscribeProtocol>>>,
96) where
97    G: Scope<Timestamp = Timestamp>,
98{
99    let name = format!("subscribe-{}", sink_id);
100    let mut op = OperatorBuilder::new(name, sinked_collection.scope());
101    let mut ok_input = op.new_input(sinked_collection.inner, Pipeline);
102    let mut err_input = op.new_input(err_collection.inner, Pipeline);
103
104    op.build(|_cap| {
105        let mut rows_to_emit = Vec::new();
106        let mut errors_to_emit = Vec::new();
107        let mut finished = false;
108
109        move |frontiers| {
110            if finished {
111                // Drain the inputs, to avoid the operator being constantly rescheduled
112                ok_input.for_each(|_, _| {});
113                err_input.for_each(|_, _| {});
114                return;
115            }
116
117            let mut frontier = Antichain::new();
118            for input_frontier in frontiers {
119                frontier.extend(input_frontier.frontier().iter().copied());
120            }
121
122            let should_emit = |time: &Timestamp| {
123                let beyond_as_of = if with_snapshot {
124                    as_of.less_equal(time)
125                } else {
126                    as_of.less_than(time)
127                };
128                let before_up_to = !up_to.less_equal(time);
129                beyond_as_of && before_up_to
130            };
131
132            ok_input.for_each(|_, data| {
133                for (row, time, diff) in data.drain(..) {
134                    if should_emit(&time) {
135                        rows_to_emit.push((time, row, diff));
136                    }
137                }
138            });
139            err_input.for_each(|_, data| {
140                for (error, time, diff) in data.drain(..) {
141                    if should_emit(&time) {
142                        errors_to_emit.push((time, error, diff));
143                    }
144                }
145            });
146
147            if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut() {
148                subscribe_protocol.send_batch(
149                    frontier.clone(),
150                    &mut rows_to_emit,
151                    &mut errors_to_emit,
152                );
153            }
154
155            if PartialOrder::less_equal(&up_to, &frontier) {
156                finished = true;
157                // We are done; indicate this by sending a batch at the
158                // empty frontier.
159                if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut()
160                {
161                    subscribe_protocol.send_batch(
162                        Antichain::default(),
163                        &mut Vec::new(),
164                        &mut Vec::new(),
165                    );
166                }
167            }
168        }
169    });
170}
171
172/// A type that guides the transmission of rows back to the coordinator.
173///
174/// A protocol instance may `send` rows indefinitely in response to `send_batch` calls.
175/// A `send_batch` call advancing the upper to the empty frontier is used to indicate the end of
176/// a stream. If the stream is not advanced to the empty frontier, the `Drop` implementation sends
177/// an indication that the protocol has finished without completion.
178struct SubscribeProtocol {
179    pub sink_id: GlobalId,
180    pub sink_as_of: Antichain<Timestamp>,
181    pub subscribe_response_buffer: Option<Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>>,
182    pub prev_upper: Antichain<Timestamp>,
183    pub output: Vec<ColumnOrder>,
184    /// The error poisoning this subscribe, if any.
185    ///
186    /// As soon as a subscribe has encountered an error, it is poisoned: It will only return the
187    /// same error in subsequent batches, until it is dropped. The subscribe protocol currently
188    /// does not support retracting errors (database-issues#5182).
189    pub poison: Option<String>,
190}
191
192impl SubscribeProtocol {
193    /// Attempt to send a batch of rows with the given `upper`.
194    ///
195    /// This method filters the updates to send based on the provided `upper`. Updates are only
196    /// sent when their times are before `upper`. If `upper` has not advanced far enough, no batch
197    /// will be sent. `rows` and `errors` that have been sent are drained from their respective
198    /// vectors, only entries that have not been sent remain after the call returns. The caller is
199    /// expected to re-submit these entries, potentially along with new ones, at a later `upper`.
200    ///
201    /// The subscribe protocol only supports reporting a single error. Because of this, it will
202    /// only actually send the first error received in a `SubscribeResponse`. Subsequent errors are
203    /// dropped. To simplify life for the caller, this method still maintains the illusion that
204    /// `errors` are handled the same way as `rows`.
205    fn send_batch(
206        &mut self,
207        upper: Antichain<Timestamp>,
208        rows: &mut Vec<(Timestamp, Row, Diff)>,
209        errors: &mut Vec<(Timestamp, DataflowError, Diff)>,
210    ) {
211        // Only send a batch if both conditions hold:
212        //  a) `upper` has reached or passed the sink's `as_of` frontier.
213        //  b) `upper` is different from when we last sent a batch.
214        if !PartialOrder::less_equal(&self.sink_as_of, &upper) || upper == self.prev_upper {
215            return;
216        }
217
218        // The compute protocol requires us to only send out consolidated batches.
219        let order = self.output.as_slice();
220        if order.is_empty() {
221            rows.sort_unstable_by(|(t0, r0, _), (t1, r1, _)| {
222                // NB: sort in reverse, so it's cheaper to peek off the tail.
223                t0.cmp(t1).then_with(|| r0.cmp(r1)).reverse()
224            });
225        } else {
226            let mut left_datum_vec = mz_repr::DatumVec::new();
227            let mut right_datum_vec = mz_repr::DatumVec::new();
228            rows.sort_unstable_by(|(t0, r0, _), (t1, r1, _)| {
229                // NB: sort in reverse, so it's cheaper to peek off the tail.
230                t0.cmp(t1)
231                    .then_with(|| {
232                        let dv0 = left_datum_vec.borrow_with(r0.as_row_ref());
233                        let dv1 = right_datum_vec.borrow_with(r1.as_row_ref());
234                        compare_columns(order, &dv0, &dv1, || r0.cmp(r1))
235                    })
236                    .reverse()
237            });
238        }
239        consolidate_updates(errors);
240
241        let ship_rows = {
242            // Chop of the tail of the reverse-sorted buffer (ie. the prefix we care about) and ship
243            // it, preserving the rest of the values for future iterations.
244            let split_at = rows.partition_point(|(t, _, _)| upper.less_equal(t));
245            let ship_updates = rows[split_at..]
246                .iter()
247                .rev()
248                .map(|(t, r, d)| (r.as_row_ref(), t, *d));
249            let len = rows[split_at..].len();
250            // We can't estimate the total size of the consolidated bytes exactly without extra work,
251            // so for now we initialize to the length times a small constant factor.
252            let byte_len = len * 32;
253            let mut builder = UpdateCollection::builder(byte_len, len);
254            for update in iter::consolidate_update_iter(ship_updates) {
255                builder.push(update);
256            }
257            rows.truncate(split_at);
258            builder.build()
259        };
260        let (keep_errors, ship_errors) = errors.drain(..).partition(|u| upper.less_equal(&u.0));
261        *errors = keep_errors;
262
263        let updates = match (&self.poison, ship_errors.first()) {
264            (Some(error), _) => {
265                // The subscribe is poisoned; keep sending the same error.
266                Err(error.clone())
267            }
268            (None, Some((_, error, _))) => {
269                // The subscribe encountered its first error; poison it.
270                let error = error.to_string();
271                self.poison = Some(error.clone());
272                Err(error)
273            }
274            (None, None) => {
275                // No error encountered so for; ship the rows we have!
276                Ok(vec![ship_rows])
277            }
278        };
279
280        let buffer = self
281            .subscribe_response_buffer
282            .as_mut()
283            .expect("The subscribe response buffer is only cleared on drop.");
284
285        buffer.borrow_mut().push((
286            self.sink_id,
287            SubscribeResponse::Batch(SubscribeBatch {
288                lower: self.prev_upper.clone(),
289                upper: upper.clone(),
290                updates,
291            }),
292        ));
293
294        let input_exhausted = upper.is_empty();
295        self.prev_upper = upper;
296        if input_exhausted {
297            // The dataflow's input has been exhausted; clear the channel,
298            // to avoid sending `SubscribeResponse::DroppedAt`.
299            self.subscribe_response_buffer = None;
300        }
301    }
302}
303
304impl Drop for SubscribeProtocol {
305    fn drop(&mut self) {
306        if let Some(buffer) = self.subscribe_response_buffer.take() {
307            buffer.borrow_mut().push((
308                self.sink_id,
309                SubscribeResponse::DroppedAt(self.prev_upper.clone()),
310            ));
311        }
312    }
313}