1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::any::Any;
use std::cell::RefCell;
use std::ops::DerefMut;
use std::rc::Rc;

use differential_dataflow::consolidation::consolidate_updates;
use differential_dataflow::Collection;
use mz_compute_client::protocol::response::{SubscribeBatch, SubscribeResponse};
use mz_compute_types::sinks::{ComputeSinkDesc, SubscribeSinkConnection};
use mz_repr::{Diff, GlobalId, Row, Timestamp};
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::errors::DataflowError;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::Scope;
use timely::progress::timestamp::Timestamp as TimelyTimestamp;
use timely::progress::Antichain;
use timely::PartialOrder;

use crate::render::sinks::SinkRender;
use crate::render::StartSignal;

impl<G> SinkRender<G> for SubscribeSinkConnection
where
    G: Scope<Timestamp = Timestamp>,
{
    fn render_sink(
        &self,
        compute_state: &mut crate::compute_state::ComputeState,
        sink: &ComputeSinkDesc<CollectionMetadata>,
        sink_id: GlobalId,
        as_of: Antichain<Timestamp>,
        _start_signal: StartSignal,
        sinked_collection: Collection<G, Row, Diff>,
        err_collection: Collection<G, DataflowError, Diff>,
    ) -> Option<Rc<dyn Any>> {
        // An encapsulation of the Subscribe response protocol.
        // Used to send rows and progress messages,
        // and alert if the dataflow was dropped before completing.
        let subscribe_protocol_handle = Rc::new(RefCell::new(Some(SubscribeProtocol {
            sink_id,
            sink_as_of: as_of.clone(),
            subscribe_response_buffer: Some(Rc::clone(&compute_state.subscribe_response_buffer)),
            prev_upper: Antichain::from_elem(Timestamp::minimum()),
            poison: None,
        })));
        let subscribe_protocol_weak = Rc::downgrade(&subscribe_protocol_handle);

        subscribe(
            sinked_collection,
            err_collection,
            sink_id,
            sink.with_snapshot,
            as_of,
            sink.up_to.clone(),
            subscribe_protocol_handle,
        );

        // Inform the coordinator that we have been dropped,
        // and destroy the subscribe protocol so the sink operator
        // can't send spurious messages while shutting down.
        Some(Rc::new(scopeguard::guard((), move |_| {
            if let Some(subscribe_protocol_handle) = subscribe_protocol_weak.upgrade() {
                std::mem::drop(subscribe_protocol_handle.borrow_mut().take())
            }
        })))
    }
}

fn subscribe<G>(
    sinked_collection: Collection<G, Row, Diff>,
    err_collection: Collection<G, DataflowError, Diff>,
    sink_id: GlobalId,
    with_snapshot: bool,
    as_of: Antichain<G::Timestamp>,
    up_to: Antichain<G::Timestamp>,
    subscribe_protocol_handle: Rc<RefCell<Option<SubscribeProtocol>>>,
) where
    G: Scope<Timestamp = Timestamp>,
{
    let name = format!("subscribe-{}", sink_id);
    let mut op = OperatorBuilder::new(name, sinked_collection.scope());
    let mut ok_input = op.new_input(&sinked_collection.inner, Pipeline);
    let mut err_input = op.new_input(&err_collection.inner, Pipeline);

    op.build(|_cap| {
        let mut rows_to_emit = Vec::new();
        let mut errors_to_emit = Vec::new();
        let mut finished = false;
        let mut ok_buf = Default::default();
        let mut err_buf = Default::default();

        move |frontiers| {
            if finished {
                // Drain the inputs, to avoid the operator being constantly rescheduled
                ok_input.for_each(|_, _| {});
                err_input.for_each(|_, _| {});
                return;
            }

            let mut frontier = Antichain::new();
            for input_frontier in frontiers {
                frontier.extend(input_frontier.frontier().iter().copied());
            }

            let should_emit = |time: &Timestamp| {
                let beyond_as_of = if with_snapshot {
                    as_of.less_equal(time)
                } else {
                    as_of.less_than(time)
                };
                let before_up_to = !up_to.less_equal(time);
                beyond_as_of && before_up_to
            };

            ok_input.for_each(|_, data| {
                data.swap(&mut ok_buf);
                for (row, time, diff) in ok_buf.drain(..) {
                    if should_emit(&time) {
                        rows_to_emit.push((time, row, diff));
                    }
                }
            });
            err_input.for_each(|_, data| {
                data.swap(&mut err_buf);
                for (error, time, diff) in err_buf.drain(..) {
                    if should_emit(&time) {
                        errors_to_emit.push((time, error, diff));
                    }
                }
            });

            if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut() {
                subscribe_protocol.send_batch(
                    frontier.clone(),
                    &mut rows_to_emit,
                    &mut errors_to_emit,
                );
            }

            if PartialOrder::less_equal(&up_to, &frontier) {
                finished = true;
                // We are done; indicate this by sending a batch at the
                // empty frontier.
                if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut()
                {
                    subscribe_protocol.send_batch(
                        Antichain::default(),
                        &mut Vec::new(),
                        &mut Vec::new(),
                    );
                }
            }
        }
    });
}

/// A type that guides the transmission of rows back to the coordinator.
///
/// A protocol instance may `send` rows indefinitely in response to `send_batch` calls.
/// A `send_batch` call advancing the upper to the empty frontier is used to indicate the end of
/// a stream. If the stream is not advanced to the empty frontier, the `Drop` implementation sends
/// an indication that the protocol has finished without completion.
struct SubscribeProtocol {
    pub sink_id: GlobalId,
    pub sink_as_of: Antichain<Timestamp>,
    pub subscribe_response_buffer: Option<Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>>,
    pub prev_upper: Antichain<Timestamp>,
    /// The error poisoning this subscribe, if any.
    ///
    /// As soon as a subscribe has encountered an error, it is poisoned: It will only return the
    /// same error in subsequent batches, until it is dropped. The subscribe protocol currently
    /// does not support retracting errors (#17781).
    pub poison: Option<String>,
}

impl SubscribeProtocol {
    /// Attempt to send a batch of rows with the given `upper`.
    ///
    /// This method filters the updates to send based on the provided `upper`. Updates are only
    /// sent when their times are before `upper`. If `upper` has not advanced far enough, no batch
    /// will be sent. `rows` and `errors` that have been sent are drained from their respective
    /// vectors, only entries that have not been sent remain after the call returns. The caller is
    /// expected to re-submit these entries, potentially along with new ones, at a later `upper`.
    ///
    /// The subscribe protocol only supports reporting a single error. Because of this, it will
    /// only actually send the first error received in a `SubscribeResponse`. Subsequent errors are
    /// dropped. To simplify life for the caller, this method still maintains the illusion that
    /// `errors` are handled the same way as `rows`.
    fn send_batch(
        &mut self,
        upper: Antichain<Timestamp>,
        rows: &mut Vec<(Timestamp, Row, Diff)>,
        errors: &mut Vec<(Timestamp, DataflowError, Diff)>,
    ) {
        // Only send a batch if both conditions hold:
        //  a) `upper` has reached or passed the sink's `as_of` frontier.
        //  b) `upper` is different from when we last sent a batch.
        if !PartialOrder::less_equal(&self.sink_as_of, &upper) || upper == self.prev_upper {
            return;
        }

        // The compute protocol requires us to only send out consolidated batches.
        consolidate_updates(rows);
        consolidate_updates(errors);

        let (keep_rows, ship_rows) = rows.drain(..).partition(|u| upper.less_equal(&u.0));
        let (keep_errors, ship_errors) = errors.drain(..).partition(|u| upper.less_equal(&u.0));
        *rows = keep_rows;
        *errors = keep_errors;

        let updates = match (&self.poison, ship_errors.first()) {
            (Some(error), _) => {
                // The subscribe is poisoned; keep sending the same error.
                Err(error.clone())
            }
            (None, Some((_, error, _))) => {
                // The subscribe encountered its first error; poison it.
                let error = error.to_string();
                self.poison = Some(error.clone());
                Err(error)
            }
            (None, None) => {
                // No error encountered so for; ship the rows we have!
                Ok(ship_rows)
            }
        };

        let buffer = self
            .subscribe_response_buffer
            .as_mut()
            .expect("The subscribe response buffer is only cleared on drop.");

        buffer.borrow_mut().push((
            self.sink_id,
            SubscribeResponse::Batch(SubscribeBatch {
                lower: self.prev_upper.clone(),
                upper: upper.clone(),
                updates,
            }),
        ));

        let input_exhausted = upper.is_empty();
        self.prev_upper = upper;
        if input_exhausted {
            // The dataflow's input has been exhausted; clear the channel,
            // to avoid sending `SubscribeResponse::DroppedAt`.
            self.subscribe_response_buffer = None;
        }
    }
}

impl Drop for SubscribeProtocol {
    fn drop(&mut self) {
        if let Some(buffer) = self.subscribe_response_buffer.take() {
            buffer.borrow_mut().push((
                self.sink_id,
                SubscribeResponse::DroppedAt(self.prev_upper.clone()),
            ));
        }
    }
}