mz_compute/sink/subscribe.rs

// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use std::any::Any;
use std::cell::RefCell;
use std::ops::DerefMut;
use std::rc::Rc;
use differential_dataflow::consolidation::consolidate_updates;
use differential_dataflow::Collection;
use mz_compute_client::protocol::response::{SubscribeBatch, SubscribeResponse};
use mz_compute_types::sinks::{ComputeSinkDesc, SubscribeSinkConnection};
use mz_repr::{Diff, GlobalId, Row, Timestamp};
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::errors::DataflowError;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::Scope;
use timely::progress::timestamp::Timestamp as TimelyTimestamp;
use timely::progress::Antichain;
use timely::PartialOrder;
use crate::render::sinks::SinkRender;
use crate::render::StartSignal;
impl<G> SinkRender<G> for SubscribeSinkConnection
where
G: Scope<Timestamp = Timestamp>,
{
fn render_sink(
&self,
compute_state: &mut crate::compute_state::ComputeState,
sink: &ComputeSinkDesc<CollectionMetadata>,
sink_id: GlobalId,
as_of: Antichain<Timestamp>,
_start_signal: StartSignal,
sinked_collection: Collection<G, Row, Diff>,
err_collection: Collection<G, DataflowError, Diff>,
_ct_times: Option<Collection<G, (), Diff>>,
) -> Option<Rc<dyn Any>> {
// An encapsulation of the Subscribe response protocol.
// Used to send rows and progress messages,
// and alert if the dataflow was dropped before completing.
let subscribe_protocol_handle = Rc::new(RefCell::new(Some(SubscribeProtocol {
sink_id,
sink_as_of: as_of.clone(),
subscribe_response_buffer: Some(Rc::clone(&compute_state.subscribe_response_buffer)),
prev_upper: Antichain::from_elem(Timestamp::minimum()),
poison: None,
})));
let subscribe_protocol_weak = Rc::downgrade(&subscribe_protocol_handle);
subscribe(
sinked_collection,
err_collection,
sink_id,
sink.with_snapshot,
as_of,
sink.up_to.clone(),
subscribe_protocol_handle,
);
// Inform the coordinator that we have been dropped,
// and destroy the subscribe protocol so the sink operator
// can't send spurious messages while shutting down.
Some(Rc::new(scopeguard::guard((), move |_| {
if let Some(subscribe_protocol_handle) = subscribe_protocol_weak.upgrade() {
std::mem::drop(subscribe_protocol_handle.borrow_mut().take())
}
})))
}
}
fn subscribe<G>(
sinked_collection: Collection<G, Row, Diff>,
err_collection: Collection<G, DataflowError, Diff>,
sink_id: GlobalId,
with_snapshot: bool,
as_of: Antichain<G::Timestamp>,
up_to: Antichain<G::Timestamp>,
subscribe_protocol_handle: Rc<RefCell<Option<SubscribeProtocol>>>,
) where
G: Scope<Timestamp = Timestamp>,
{
let name = format!("subscribe-{}", sink_id);
let mut op = OperatorBuilder::new(name, sinked_collection.scope());
let mut ok_input = op.new_input(&sinked_collection.inner, Pipeline);
let mut err_input = op.new_input(&err_collection.inner, Pipeline);
op.build(|_cap| {
let mut rows_to_emit = Vec::new();
let mut errors_to_emit = Vec::new();
let mut finished = false;
move |frontiers| {
if finished {
// Drain the inputs, to avoid the operator being constantly rescheduled
ok_input.for_each(|_, _| {});
err_input.for_each(|_, _| {});
return;
}
let mut frontier = Antichain::new();
for input_frontier in frontiers {
frontier.extend(input_frontier.frontier().iter().copied());
}
let should_emit = |time: &Timestamp| {
let beyond_as_of = if with_snapshot {
as_of.less_equal(time)
} else {
as_of.less_than(time)
};
let before_up_to = !up_to.less_equal(time);
beyond_as_of && before_up_to
};
ok_input.for_each(|_, data| {
for (row, time, diff) in data.drain(..) {
if should_emit(&time) {
rows_to_emit.push((time, row, diff));
}
}
});
err_input.for_each(|_, data| {
for (error, time, diff) in data.drain(..) {
if should_emit(&time) {
errors_to_emit.push((time, error, diff));
}
}
});
if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut() {
subscribe_protocol.send_batch(
frontier.clone(),
&mut rows_to_emit,
&mut errors_to_emit,
);
}
if PartialOrder::less_equal(&up_to, &frontier) {
finished = true;
// We are done; indicate this by sending a batch at the
// empty frontier.
if let Some(subscribe_protocol) = subscribe_protocol_handle.borrow_mut().deref_mut()
{
subscribe_protocol.send_batch(
Antichain::default(),
&mut Vec::new(),
&mut Vec::new(),
);
}
}
}
});
}
/// A type that guides the transmission of rows back to the coordinator.
///
/// A protocol instance may `send` rows indefinitely in response to `send_batch` calls.
/// A `send_batch` call advancing the upper to the empty frontier is used to indicate the end of
/// a stream. If the stream is not advanced to the empty frontier, the `Drop` implementation sends
/// an indication that the protocol has finished without completion.
struct SubscribeProtocol {
pub sink_id: GlobalId,
pub sink_as_of: Antichain<Timestamp>,
pub subscribe_response_buffer: Option<Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>>,
pub prev_upper: Antichain<Timestamp>,
/// The error poisoning this subscribe, if any.
///
/// As soon as a subscribe has encountered an error, it is poisoned: It will only return the
/// same error in subsequent batches, until it is dropped. The subscribe protocol currently
/// does not support retracting errors (database-issues#5182).
pub poison: Option<String>,
}
impl SubscribeProtocol {
/// Attempt to send a batch of rows with the given `upper`.
///
/// This method filters the updates to send based on the provided `upper`. Updates are only
/// sent when their times are before `upper`. If `upper` has not advanced far enough, no batch
/// will be sent. `rows` and `errors` that have been sent are drained from their respective
/// vectors, only entries that have not been sent remain after the call returns. The caller is
/// expected to re-submit these entries, potentially along with new ones, at a later `upper`.
///
/// The subscribe protocol only supports reporting a single error. Because of this, it will
/// only actually send the first error received in a `SubscribeResponse`. Subsequent errors are
/// dropped. To simplify life for the caller, this method still maintains the illusion that
/// `errors` are handled the same way as `rows`.
fn send_batch(
&mut self,
upper: Antichain<Timestamp>,
rows: &mut Vec<(Timestamp, Row, Diff)>,
errors: &mut Vec<(Timestamp, DataflowError, Diff)>,
) {
// Only send a batch if both conditions hold:
// a) `upper` has reached or passed the sink's `as_of` frontier.
// b) `upper` is different from when we last sent a batch.
if !PartialOrder::less_equal(&self.sink_as_of, &upper) || upper == self.prev_upper {
return;
}
// The compute protocol requires us to only send out consolidated batches.
consolidate_updates(rows);
consolidate_updates(errors);
let (keep_rows, ship_rows) = rows.drain(..).partition(|u| upper.less_equal(&u.0));
let (keep_errors, ship_errors) = errors.drain(..).partition(|u| upper.less_equal(&u.0));
*rows = keep_rows;
*errors = keep_errors;
let updates = match (&self.poison, ship_errors.first()) {
(Some(error), _) => {
// The subscribe is poisoned; keep sending the same error.
Err(error.clone())
}
(None, Some((_, error, _))) => {
// The subscribe encountered its first error; poison it.
let error = error.to_string();
self.poison = Some(error.clone());
Err(error)
}
(None, None) => {
// No error encountered so for; ship the rows we have!
Ok(ship_rows)
}
};
let buffer = self
.subscribe_response_buffer
.as_mut()
.expect("The subscribe response buffer is only cleared on drop.");
buffer.borrow_mut().push((
self.sink_id,
SubscribeResponse::Batch(SubscribeBatch {
lower: self.prev_upper.clone(),
upper: upper.clone(),
updates,
}),
));
let input_exhausted = upper.is_empty();
self.prev_upper = upper;
if input_exhausted {
// The dataflow's input has been exhausted; clear the channel,
// to avoid sending `SubscribeResponse::DroppedAt`.
self.subscribe_response_buffer = None;
}
}
}
impl Drop for SubscribeProtocol {
fn drop(&mut self) {
if let Some(buffer) = self.subscribe_response_buffer.take() {
buffer.borrow_mut().push((
self.sink_id,
SubscribeResponse::DroppedAt(self.prev_upper.clone()),
));
}
}
}