1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Implementations around supporting the TAIL protocol with the dataflow layer

use tokio::sync::mpsc;

use mz_compute_client::response::{TailBatch, TailResponse};
use mz_repr::adt::numeric;
use mz_repr::{Datum, Row};

use crate::coord::peek::PeekResponseUnary;

/// A description of a pending tail from coord's perspective
pub(crate) struct PendingTail {
    /// Channel to send responses to the client
    ///
    /// The responses have the form `PeekResponseUnary` but should perhaps become `TailResponse`.
    channel: mpsc::UnboundedSender<PeekResponseUnary>,
    /// Whether progress information should be emitted
    emit_progress: bool,
    /// Number of columns in the output
    arity: usize,
}

impl PendingTail {
    /// Create a new [PendingTail].
    /// * The `channel` receives batches of finalized PeekResponses.
    /// * If `emit_progress` is true, the finalized rows are either data or progress updates
    /// * `arity` is the arity of the sink relation.
    pub(crate) fn new(
        channel: mpsc::UnboundedSender<PeekResponseUnary>,
        emit_progress: bool,
        arity: usize,
    ) -> Self {
        Self {
            channel,
            emit_progress,
            arity,
        }
    }

    /// Process a tail response
    ///
    /// Returns `true` if the sink should be removed.
    pub(crate) fn process_response(&mut self, response: TailResponse) -> bool {
        let mut row_buf = Row::default();
        match response {
            TailResponse::Batch(TailBatch {
                lower: _,
                upper,
                updates: mut rows,
            }) => {
                // Sort results by time. We use stable sort here because it will produce deterministic
                // results since the cursor will always produce rows in the same order.
                // TODO: Is sorting necessary?
                rows.sort_by_key(|(time, _, _)| *time);

                let rows = rows
                    .into_iter()
                    .map(|(time, row, diff)| {
                        let mut packer = row_buf.packer();
                        // TODO: Change to MzTimestamp.
                        packer.push(Datum::from(numeric::Numeric::from(time)));
                        if self.emit_progress {
                            // When sinking with PROGRESS, the output
                            // includes an additional column that
                            // indicates whether a timestamp is
                            // complete. For regular "data" updates this
                            // is always `false`.
                            packer.push(Datum::False);
                        }

                        packer.push(Datum::Int64(diff));

                        packer.extend_by_row(&row);

                        row_buf.clone()
                    })
                    .collect();
                // TODO(benesch): the lack of backpressure here can result in
                // unbounded memory usage.
                let result = self.channel.send(PeekResponseUnary::Rows(rows));
                if result.is_err() {
                    // TODO(benesch): we should actually drop the sink if the
                    // receiver has gone away. E.g. form a DROP SINK command?
                }

                if self.emit_progress && !upper.is_empty() {
                    assert_eq!(
                        upper.len(),
                        1,
                        "TAIL only supports single-dimensional timestamps"
                    );
                    let mut packer = row_buf.packer();
                    packer.push(Datum::from(numeric::Numeric::from(*&upper[0])));
                    packer.push(Datum::True);
                    // Fill in the diff column and all table columns with NULL.
                    for _ in 0..(self.arity + 1) {
                        packer.push(Datum::Null);
                    }

                    let result = self.channel.send(PeekResponseUnary::Rows(vec![row_buf]));
                    if result.is_err() {
                        // TODO(benesch): we should actually drop the sink if the
                        // receiver has gone away. E.g. form a DROP SINK command?
                    }
                }
                upper.is_empty()
            }
            TailResponse::DroppedAt(_frontier) => {
                // TODO: Could perhaps do this earlier, in response to DROP SINK.
                true
            }
        }
    }
}