mz_compute/sink/
subscribe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::any::Any;
11use std::cell::RefCell;
12use std::ops::DerefMut;
13use std::rc::Rc;
14
15use differential_dataflow::consolidation::consolidate_updates;
16use differential_dataflow::{AsCollection, Collection};
17use mz_compute_client::protocol::response::{SubscribeBatch, SubscribeResponse};
18use mz_compute_types::sinks::{ComputeSinkDesc, SubscribeSinkConnection};
19use mz_repr::{Diff, GlobalId, Row, Timestamp};
20use mz_storage_types::controller::CollectionMetadata;
21use mz_storage_types::errors::DataflowError;
22use mz_timely_util::probe::{Handle, ProbeNotify};
23use timely::PartialOrder;
24use timely::dataflow::Scope;
25use timely::dataflow::channels::pact::Pipeline;
26use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
27use timely::progress::Antichain;
28use timely::progress::timestamp::Timestamp as TimelyTimestamp;
29
30use crate::render::StartSignal;
31use crate::render::sinks::SinkRender;
32
33impl<G> SinkRender<G> for SubscribeSinkConnection
34where
35    G: Scope<Timestamp = Timestamp>,
36{
37    fn render_sink(
38        &self,
39        compute_state: &mut crate::compute_state::ComputeState,
40        sink: &ComputeSinkDesc<CollectionMetadata>,
41        sink_id: GlobalId,
42        as_of: Antichain<Timestamp>,
43        _start_signal: StartSignal,
44        sinked_collection: Collection<G, Row, Diff>,
45        err_collection: Collection<G, DataflowError, Diff>,
46        _ct_times: Option<Collection<G, (), Diff>>,
47        output_probe: &Handle<Timestamp>,
48    ) -> Option<Rc<dyn Any>> {
49        // An encapsulation of the Subscribe response protocol.
50        // Used to send rows and progress messages,
51        // and alert if the dataflow was dropped before completing.
52        let subscribe_protocol_handle = Rc::new(RefCell::new(Some(SubscribeProtocol {
53            sink_id,
54            sink_as_of: as_of.clone(),
55            subscribe_response_buffer: Some(Rc::clone(&compute_state.subscribe_response_buffer)),
56            prev_upper: Antichain::from_elem(Timestamp::minimum()),
57            poison: None,
58        })));
59        let subscribe_protocol_weak = Rc::downgrade(&subscribe_protocol_handle);
60        let sinked_collection = sinked_collection
61            .inner
62            .probe_notify_with(vec![output_probe.clone()])
63            .as_collection();
64        subscribe(
65            sinked_collection,
66            err_collection,
67            sink_id,
68            sink.with_snapshot,
69            as_of,
70            sink.up_to.clone(),
71            subscribe_protocol_handle,
72        );
73
74        // Inform the coordinator that we have been dropped,
75        // and destroy the subscribe protocol so the sink operator
76        // can't send spurious messages while shutting down.
77        Some(Rc::new(scopeguard::guard((), move |_| {
78            if let Some(subscribe_protocol_handle) = subscribe_protocol_weak.upgrade() {
79                std::mem::drop(subscribe_protocol_handle.borrow_mut().take())
80            }
81        })))
82    }
83}
84
85fn subscribe<G>(
86    sinked_collection: Collection<G, Row, Diff>,
87    err_collection: Collection<G, DataflowError, Diff>,
88    sink_id: GlobalId,
89    with_snapshot: bool,
90    as_of: Antichain<G::Timestamp>,
91    up_to: Antichain<G::Timestamp>,
92    subscribe_protocol_handle: Rc<RefCell<Option<SubscribeProtocol>>>,
93) where
94    G: Scope<Timestamp = Timestamp>,
95{
96    let name = format!("subscribe-{}", sink_id);
97    let mut op = OperatorBuilder::new(name, sinked_collection.scope());
98    let mut ok_input = op.new_input(&sinked_collection.inner, Pipeline);
99    let mut err_input = op.new_input(&err_collection.inner, Pipeline);
100
101    op.build(|_cap| {
102        let mut rows_to_emit = Vec::new();
103        let mut errors_to_emit = Vec::new();
104        let mut finished = false;
105
106        move |frontiers| {
107            if finished {
108                // Drain the inputs, to avoid the operator being constantly rescheduled
109                ok_input.for_each(|_, _| {});
110                err_input.for_each(|_, _| {});
111                return;
112            }
113
114            let mut frontier = Antichain::new();
115            for input_frontier in frontiers {
116                frontier.extend(input_frontier.frontier().iter().copied());
117            }
118
119            let should_emit = |time: &Timestamp| {
120                let beyond_as_of = if with_snapshot {
121                    as_of.less_equal(time)
122                } else {
123                    as_of.less_than(time)
124                };
125                let before_up_to = !up_to.less_equal(time);
126                beyond_as_of && before_up_to
127            };
128
129            ok_input.for_each(|_, data| {
130                for (row, time, diff) in data.drain(..) {
131                    if should_emit(&time) {
132                        rows_to_emit.push((time, row, diff));
133                    }
134                }
135            });
136            err_input.for_each(|_, data| {
137                for (error, time, diff) in data.drain(..) {
138                    if should_emit(&time) {
139                        errors_to_emit.push((time, error, diff));
140                    }
141                }
142            });
143
144            if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut() {
145                subscribe_protocol.send_batch(
146                    frontier.clone(),
147                    &mut rows_to_emit,
148                    &mut errors_to_emit,
149                );
150            }
151
152            if PartialOrder::less_equal(&up_to, &frontier) {
153                finished = true;
154                // We are done; indicate this by sending a batch at the
155                // empty frontier.
156                if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut()
157                {
158                    subscribe_protocol.send_batch(
159                        Antichain::default(),
160                        &mut Vec::new(),
161                        &mut Vec::new(),
162                    );
163                }
164            }
165        }
166    });
167}
168
169/// A type that guides the transmission of rows back to the coordinator.
170///
171/// A protocol instance may `send` rows indefinitely in response to `send_batch` calls.
172/// A `send_batch` call advancing the upper to the empty frontier is used to indicate the end of
173/// a stream. If the stream is not advanced to the empty frontier, the `Drop` implementation sends
174/// an indication that the protocol has finished without completion.
175struct SubscribeProtocol {
176    pub sink_id: GlobalId,
177    pub sink_as_of: Antichain<Timestamp>,
178    pub subscribe_response_buffer: Option<Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>>,
179    pub prev_upper: Antichain<Timestamp>,
180    /// The error poisoning this subscribe, if any.
181    ///
182    /// As soon as a subscribe has encountered an error, it is poisoned: It will only return the
183    /// same error in subsequent batches, until it is dropped. The subscribe protocol currently
184    /// does not support retracting errors (database-issues#5182).
185    pub poison: Option<String>,
186}
187
188impl SubscribeProtocol {
189    /// Attempt to send a batch of rows with the given `upper`.
190    ///
191    /// This method filters the updates to send based on the provided `upper`. Updates are only
192    /// sent when their times are before `upper`. If `upper` has not advanced far enough, no batch
193    /// will be sent. `rows` and `errors` that have been sent are drained from their respective
194    /// vectors, only entries that have not been sent remain after the call returns. The caller is
195    /// expected to re-submit these entries, potentially along with new ones, at a later `upper`.
196    ///
197    /// The subscribe protocol only supports reporting a single error. Because of this, it will
198    /// only actually send the first error received in a `SubscribeResponse`. Subsequent errors are
199    /// dropped. To simplify life for the caller, this method still maintains the illusion that
200    /// `errors` are handled the same way as `rows`.
201    fn send_batch(
202        &mut self,
203        upper: Antichain<Timestamp>,
204        rows: &mut Vec<(Timestamp, Row, Diff)>,
205        errors: &mut Vec<(Timestamp, DataflowError, Diff)>,
206    ) {
207        // Only send a batch if both conditions hold:
208        //  a) `upper` has reached or passed the sink's `as_of` frontier.
209        //  b) `upper` is different from when we last sent a batch.
210        if !PartialOrder::less_equal(&self.sink_as_of, &upper) || upper == self.prev_upper {
211            return;
212        }
213
214        // The compute protocol requires us to only send out consolidated batches.
215        consolidate_updates(rows);
216        consolidate_updates(errors);
217
218        let (keep_rows, ship_rows) = rows.drain(..).partition(|u| upper.less_equal(&u.0));
219        let (keep_errors, ship_errors) = errors.drain(..).partition(|u| upper.less_equal(&u.0));
220        *rows = keep_rows;
221        *errors = keep_errors;
222
223        let updates = match (&self.poison, ship_errors.first()) {
224            (Some(error), _) => {
225                // The subscribe is poisoned; keep sending the same error.
226                Err(error.clone())
227            }
228            (None, Some((_, error, _))) => {
229                // The subscribe encountered its first error; poison it.
230                let error = error.to_string();
231                self.poison = Some(error.clone());
232                Err(error)
233            }
234            (None, None) => {
235                // No error encountered so for; ship the rows we have!
236                Ok(ship_rows)
237            }
238        };
239
240        let buffer = self
241            .subscribe_response_buffer
242            .as_mut()
243            .expect("The subscribe response buffer is only cleared on drop.");
244
245        buffer.borrow_mut().push((
246            self.sink_id,
247            SubscribeResponse::Batch(SubscribeBatch {
248                lower: self.prev_upper.clone(),
249                upper: upper.clone(),
250                updates,
251            }),
252        ));
253
254        let input_exhausted = upper.is_empty();
255        self.prev_upper = upper;
256        if input_exhausted {
257            // The dataflow's input has been exhausted; clear the channel,
258            // to avoid sending `SubscribeResponse::DroppedAt`.
259            self.subscribe_response_buffer = None;
260        }
261    }
262}
263
264impl Drop for SubscribeProtocol {
265    fn drop(&mut self) {
266        if let Some(buffer) = self.subscribe_response_buffer.take() {
267            buffer.borrow_mut().push((
268                self.sink_id,
269                SubscribeResponse::DroppedAt(self.prev_upper.clone()),
270            ));
271        }
272    }
273}