1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Implementations around supporting the TAIL protocol with the dataflow layer
use tokio::sync::mpsc;
use mz_compute_client::response::{TailBatch, TailResponse};
use mz_repr::adt::numeric;
use mz_repr::{Datum, Row};
use crate::coord::peek::PeekResponseUnary;
/// A description of a pending tail from coord's perspective
pub(crate) struct PendingTail {
/// Channel to send responses to the client
///
/// The responses have the form `PeekResponseUnary` but should perhaps become `TailResponse`.
channel: mpsc::UnboundedSender<PeekResponseUnary>,
/// Whether progress information should be emitted
emit_progress: bool,
/// Number of columns in the output
arity: usize,
}
impl PendingTail {
/// Create a new [PendingTail].
/// * The `channel` receives batches of finalized PeekResponses.
/// * If `emit_progress` is true, the finalized rows are either data or progress updates
/// * `arity` is the arity of the sink relation.
pub(crate) fn new(
channel: mpsc::UnboundedSender<PeekResponseUnary>,
emit_progress: bool,
arity: usize,
) -> Self {
Self {
channel,
emit_progress,
arity,
}
}
/// Process a tail response
///
/// Returns `true` if the sink should be removed.
pub(crate) fn process_response(&mut self, response: TailResponse) -> bool {
let mut row_buf = Row::default();
match response {
TailResponse::Batch(TailBatch {
lower: _,
upper,
updates: mut rows,
}) => {
// Sort results by time. We use stable sort here because it will produce deterministic
// results since the cursor will always produce rows in the same order.
// TODO: Is sorting necessary?
rows.sort_by_key(|(time, _, _)| *time);
let rows = rows
.into_iter()
.map(|(time, row, diff)| {
let mut packer = row_buf.packer();
// TODO: Change to MzTimestamp.
packer.push(Datum::from(numeric::Numeric::from(time)));
if self.emit_progress {
// When sinking with PROGRESS, the output
// includes an additional column that
// indicates whether a timestamp is
// complete. For regular "data" updates this
// is always `false`.
packer.push(Datum::False);
}
packer.push(Datum::Int64(diff));
packer.extend_by_row(&row);
row_buf.clone()
})
.collect();
// TODO(benesch): the lack of backpressure here can result in
// unbounded memory usage.
let result = self.channel.send(PeekResponseUnary::Rows(rows));
if result.is_err() {
// TODO(benesch): we should actually drop the sink if the
// receiver has gone away. E.g. form a DROP SINK command?
}
if self.emit_progress && !upper.is_empty() {
assert_eq!(
upper.len(),
1,
"TAIL only supports single-dimensional timestamps"
);
let mut packer = row_buf.packer();
packer.push(Datum::from(numeric::Numeric::from(*&upper[0])));
packer.push(Datum::True);
// Fill in the diff column and all table columns with NULL.
for _ in 0..(self.arity + 1) {
packer.push(Datum::Null);
}
let result = self.channel.send(PeekResponseUnary::Rows(vec![row_buf]));
if result.is_err() {
// TODO(benesch): we should actually drop the sink if the
// receiver has gone away. E.g. form a DROP SINK command?
}
}
upper.is_empty()
}
TailResponse::DroppedAt(_frontier) => {
// TODO: Could perhaps do this earlier, in response to DROP SINK.
true
}
}
}
}