mz_compute/sink/subscribe.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use std::any::Any;
use std::cell::RefCell;
use std::ops::DerefMut;
use std::rc::Rc;
use differential_dataflow::consolidation::consolidate_updates;
use differential_dataflow::Collection;
use mz_compute_client::protocol::response::{SubscribeBatch, SubscribeResponse};
use mz_compute_types::sinks::{ComputeSinkDesc, SubscribeSinkConnection};
use mz_repr::{Diff, GlobalId, Row, Timestamp};
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::errors::DataflowError;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::Scope;
use timely::progress::timestamp::Timestamp as TimelyTimestamp;
use timely::progress::Antichain;
use timely::PartialOrder;
use crate::render::sinks::SinkRender;
use crate::render::StartSignal;
impl<G> SinkRender<G> for SubscribeSinkConnection
where
G: Scope<Timestamp = Timestamp>,
{
fn render_sink(
&self,
compute_state: &mut crate::compute_state::ComputeState,
sink: &ComputeSinkDesc<CollectionMetadata>,
sink_id: GlobalId,
as_of: Antichain<Timestamp>,
_start_signal: StartSignal,
sinked_collection: Collection<G, Row, Diff>,
err_collection: Collection<G, DataflowError, Diff>,
_ct_times: Option<Collection<G, (), Diff>>,
) -> Option<Rc<dyn Any>> {
// An encapsulation of the Subscribe response protocol.
// Used to send rows and progress messages,
// and alert if the dataflow was dropped before completing.
let subscribe_protocol_handle = Rc::new(RefCell::new(Some(SubscribeProtocol {
sink_id,
sink_as_of: as_of.clone(),
subscribe_response_buffer: Some(Rc::clone(&compute_state.subscribe_response_buffer)),
prev_upper: Antichain::from_elem(Timestamp::minimum()),
poison: None,
})));
let subscribe_protocol_weak = Rc::downgrade(&subscribe_protocol_handle);
subscribe(
sinked_collection,
err_collection,
sink_id,
sink.with_snapshot,
as_of,
sink.up_to.clone(),
subscribe_protocol_handle,
);
// Inform the coordinator that we have been dropped,
// and destroy the subscribe protocol so the sink operator
// can't send spurious messages while shutting down.
Some(Rc::new(scopeguard::guard((), move |_| {
if let Some(subscribe_protocol_handle) = subscribe_protocol_weak.upgrade() {
std::mem::drop(subscribe_protocol_handle.borrow_mut().take())
}
})))
}
}
fn subscribe<G>(
sinked_collection: Collection<G, Row, Diff>,
err_collection: Collection<G, DataflowError, Diff>,
sink_id: GlobalId,
with_snapshot: bool,
as_of: Antichain<G::Timestamp>,
up_to: Antichain<G::Timestamp>,
subscribe_protocol_handle: Rc<RefCell<Option<SubscribeProtocol>>>,
) where
G: Scope<Timestamp = Timestamp>,
{
let name = format!("subscribe-{}", sink_id);
let mut op = OperatorBuilder::new(name, sinked_collection.scope());
let mut ok_input = op.new_input(&sinked_collection.inner, Pipeline);
let mut err_input = op.new_input(&err_collection.inner, Pipeline);
op.build(|_cap| {
let mut rows_to_emit = Vec::new();
let mut errors_to_emit = Vec::new();
let mut finished = false;
move |frontiers| {
if finished {
// Drain the inputs, to avoid the operator being constantly rescheduled
ok_input.for_each(|_, _| {});
err_input.for_each(|_, _| {});
return;
}
let mut frontier = Antichain::new();
for input_frontier in frontiers {
frontier.extend(input_frontier.frontier().iter().copied());
}
let should_emit = |time: &Timestamp| {
let beyond_as_of = if with_snapshot {
as_of.less_equal(time)
} else {
as_of.less_than(time)
};
let before_up_to = !up_to.less_equal(time);
beyond_as_of && before_up_to
};
ok_input.for_each(|_, data| {
for (row, time, diff) in data.drain(..) {
if should_emit(&time) {
rows_to_emit.push((time, row, diff));
}
}
});
err_input.for_each(|_, data| {
for (error, time, diff) in data.drain(..) {
if should_emit(&time) {
errors_to_emit.push((time, error, diff));
}
}
});
if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut() {
subscribe_protocol.send_batch(
frontier.clone(),
&mut rows_to_emit,
&mut errors_to_emit,
);
}
if PartialOrder::less_equal(&up_to, &frontier) {
finished = true;
// We are done; indicate this by sending a batch at the
// empty frontier.
if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut()
{
subscribe_protocol.send_batch(
Antichain::default(),
&mut Vec::new(),
&mut Vec::new(),
);
}
}
}
});
}
/// A type that guides the transmission of rows back to the coordinator.
///
/// A protocol instance may `send` rows indefinitely in response to `send_batch` calls.
/// A `send_batch` call advancing the upper to the empty frontier is used to indicate the end of
/// a stream. If the stream is not advanced to the empty frontier, the `Drop` implementation sends
/// an indication that the protocol has finished without completion.
struct SubscribeProtocol {
pub sink_id: GlobalId,
pub sink_as_of: Antichain<Timestamp>,
pub subscribe_response_buffer: Option<Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>>,
pub prev_upper: Antichain<Timestamp>,
/// The error poisoning this subscribe, if any.
///
/// As soon as a subscribe has encountered an error, it is poisoned: It will only return the
/// same error in subsequent batches, until it is dropped. The subscribe protocol currently
/// does not support retracting errors (database-issues#5182).
pub poison: Option<String>,
}
impl SubscribeProtocol {
/// Attempt to send a batch of rows with the given `upper`.
///
/// This method filters the updates to send based on the provided `upper`. Updates are only
/// sent when their times are before `upper`. If `upper` has not advanced far enough, no batch
/// will be sent. `rows` and `errors` that have been sent are drained from their respective
/// vectors, only entries that have not been sent remain after the call returns. The caller is
/// expected to re-submit these entries, potentially along with new ones, at a later `upper`.
///
/// The subscribe protocol only supports reporting a single error. Because of this, it will
/// only actually send the first error received in a `SubscribeResponse`. Subsequent errors are
/// dropped. To simplify life for the caller, this method still maintains the illusion that
/// `errors` are handled the same way as `rows`.
fn send_batch(
&mut self,
upper: Antichain<Timestamp>,
rows: &mut Vec<(Timestamp, Row, Diff)>,
errors: &mut Vec<(Timestamp, DataflowError, Diff)>,
) {
// Only send a batch if both conditions hold:
// a) `upper` has reached or passed the sink's `as_of` frontier.
// b) `upper` is different from when we last sent a batch.
if !PartialOrder::less_equal(&self.sink_as_of, &upper) || upper == self.prev_upper {
return;
}
// The compute protocol requires us to only send out consolidated batches.
consolidate_updates(rows);
consolidate_updates(errors);
let (keep_rows, ship_rows) = rows.drain(..).partition(|u| upper.less_equal(&u.0));
let (keep_errors, ship_errors) = errors.drain(..).partition(|u| upper.less_equal(&u.0));
*rows = keep_rows;
*errors = keep_errors;
let updates = match (&self.poison, ship_errors.first()) {
(Some(error), _) => {
// The subscribe is poisoned; keep sending the same error.
Err(error.clone())
}
(None, Some((_, error, _))) => {
// The subscribe encountered its first error; poison it.
let error = error.to_string();
self.poison = Some(error.clone());
Err(error)
}
(None, None) => {
// No error encountered so for; ship the rows we have!
Ok(ship_rows)
}
};
let buffer = self
.subscribe_response_buffer
.as_mut()
.expect("The subscribe response buffer is only cleared on drop.");
buffer.borrow_mut().push((
self.sink_id,
SubscribeResponse::Batch(SubscribeBatch {
lower: self.prev_upper.clone(),
upper: upper.clone(),
updates,
}),
));
let input_exhausted = upper.is_empty();
self.prev_upper = upper;
if input_exhausted {
// The dataflow's input has been exhausted; clear the channel,
// to avoid sending `SubscribeResponse::DroppedAt`.
self.subscribe_response_buffer = None;
}
}
}
impl Drop for SubscribeProtocol {
fn drop(&mut self) {
if let Some(buffer) = self.subscribe_response_buffer.take() {
buffer.borrow_mut().push((
self.sink_id,
SubscribeResponse::DroppedAt(self.prev_upper.clone()),
));
}
}
}