1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License in the LICENSE file at the
// root of this repository, or online at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use differential_dataflow::consolidation::consolidate_updates;
use differential_dataflow::difference::Semigroup;
use differential_dataflow::Data;
use timely::communication::Push;
use timely::dataflow::channels::Bundle;
use timely::dataflow::operators::generic::OutputHandle;
use timely::dataflow::operators::{Capability, InputCapability};
use timely::progress::Timestamp;

/// A buffer that consolidates updates
///
/// The buffer implements a wrapper around [OutputHandle] consolidating elements pushed to it. It is
/// backed by a capacity-limited buffer, which means that compaction only occurs within the
/// dimensions of the buffer, i.e. the number of unique keys is less than half of the buffer's
/// capacity.
///
/// A cap is retained whenever the current time changes to be able to flush on drop or when the time
/// changes again.
///
/// The buffer is filled with updates until it reaches its capacity. At this point, the updates are
/// consolidated to free up space. This process repeats until the consolidation recovered less than
/// half of the buffer's capacity, at which point the buffer will be shipped.
///
/// The buffer retains a capability to send data on flush. It will flush all data once dropped, if
/// time changes, or if the buffer capacity is reached.
pub struct ConsolidateBuffer<'a, 'b, T, D: Data, R: Semigroup, P>
where
    P: Push<Bundle<T, Vec<(D, T, R)>>> + 'a,
    T: Data + Timestamp + 'a,
    D: 'a,
{
    // a buffer for records, to send at self.cap
    // Invariant: Buffer only contains data if cap is Some.
    buffer: Vec<(D, T, R)>,
    output_handle: &'b mut OutputHandle<'a, T, (D, T, R), P>,
    cap: Option<Capability<T>>,
    port: usize,
    previous_len: usize,
}

impl<'a, 'b, T, D: Data, R: Semigroup, P> ConsolidateBuffer<'a, 'b, T, D, R, P>
where
    T: Data + Timestamp + 'a,
    P: Push<Bundle<T, Vec<(D, T, R)>>> + 'a,
{
    /// Create a new [ConsolidateBuffer], wrapping the provided session.
    ///
    /// * `output_handle`: The output to send data to.
    /// * 'port': The output port to retain capabilities for.
    pub fn new(output_handle: &'b mut OutputHandle<'a, T, (D, T, R), P>, port: usize) -> Self {
        Self {
            output_handle,
            port,
            cap: None,
            buffer: Vec::with_capacity(::timely::container::buffer::default_capacity::<(D, T, R)>()),
            previous_len: 0,
        }
    }

    #[inline]
    /// Provides an iterator of elements to the buffer
    pub fn give_iterator<I: Iterator<Item = (D, T, R)>>(
        &mut self,
        cap: &InputCapability<T>,
        iter: I,
    ) {
        for item in iter {
            self.give(cap, item);
        }
    }

    /// Give an element to the buffer
    pub fn give(&mut self, cap: &InputCapability<T>, data: (D, T, R)) {
        // Retain a cap for the current time, which will be used on flush.
        if self.cap.as_ref().map_or(true, |t| t.time() != cap.time()) {
            // Flush on capability change
            self.flush();
            // Retain capability for the specified output port.
            self.cap = Some(cap.delayed_for_output(cap.time(), self.port));
        }
        self.give_internal(data);
    }

    /// Give an element to the buffer, using a pre-fabricated capability. Note that the capability
    /// must be valid for the associated output.
    pub fn give_at(&mut self, cap: &Capability<T>, data: (D, T, R)) {
        // Retain a cap for the current time, which will be used on flush.
        if self.cap.as_ref().map_or(true, |t| t.time() != cap.time()) {
            // Flush on capability change
            self.flush();
            // Retain capability.
            self.cap = Some(cap.clone());
        }
        self.give_internal(data);
    }

    /// Give an element and possibly flush the buffer. Note that this needs to have access
    /// to a capability, which the public functions ensure.
    fn give_internal(&mut self, data: (D, T, R)) {
        self.buffer.push(data);

        // Limit, if possible, the lifetime of the allocations for data
        // and consolidate smaller buffers if we're in the lucky case
        // of a small domain for D
        if self.buffer.len() >= 2 * self.previous_len {
            // Consolidate while the consolidation frees at least half the buffer
            consolidate_updates(&mut self.buffer);
            if self.buffer.len() > self.buffer.capacity() / 2 {
                self.flush();
            } else {
                self.previous_len = self.buffer.len();
            }
            // At this point, it is an invariant across give calls that self.previous_len
            // will be in the interval [0, self.buffer.capacity() / 2]. So, we will enter
            // this if-statement block again when self.buffer.len() == self.buffer.capacity()
            // or earlier. If consolidation is not effective to keep self.buffer.len()
            // below half capacity, then flushing when more than half-full will
            // maintain the invariant.
        }
    }

    /// Flush the internal buffer to the underlying session
    pub fn flush(&mut self) {
        if let Some(cap) = &self.cap {
            self.output_handle.session(cap).give_vec(&mut self.buffer);

            // Ensure that the capacity is at least equal to the default in case
            // it was reduced by give_vec. Note that we cannot rely here on give_vec
            // returning us a buffer with zero capacity.
            if self.buffer.capacity() < ::timely::container::buffer::default_capacity::<(D, T, R)>()
            {
                let to_reserve = ::timely::container::buffer::default_capacity::<(D, T, R)>()
                    - self.buffer.capacity();
                self.buffer.reserve_exact(to_reserve);
            }
            self.previous_len = 0;
        }
    }
}

impl<'a, 'b, T, D: Data, R: Semigroup, P> Drop for ConsolidateBuffer<'a, 'b, T, D, R, P>
where
    P: Push<Bundle<T, Vec<(D, T, R)>>> + 'a,
    T: Data + Timestamp + 'a,
    D: 'a,
{
    fn drop(&mut self) {
        self.flush();
    }
}