1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
//! A reference-counted wrapper sharing one owned trace.
//!
//! The types in this module, `TraceBox` and `TraceRc` and meant to parallel `RcBox` and `Rc` in `std::rc`.
//!
//! The first typee is an owned trace with some information about the cumulative requirements of the shared
//! handles. This is roughly how much progress has each made, so we know which "read capabilities" they have
//! collectively dropped, and when it is safe to inform the trace of such progress.
//!
//! The second type is a wrapper which presents as a `TraceReader`, but whose methods for advancing its read
//! capabilities interact with the `TraceBox` rather than directly with the owned trace. Ideally, instances
//! `TraceRc` should appear indistinguishable from the underlying trace from a reading perspective, with the
//! exception that the trace may not compact its representation as fast as if it were exclusively owned.

use std::rc::Rc;
use std::cell::RefCell;

use timely::progress::{Antichain, frontier::{AntichainRef, MutableAntichain}};

use crate::trace::TraceReader;

/// A wrapper around a trace which tracks the frontiers of all referees.
///
/// This is an internal type, unlikely to be useful to higher-level programs, but exposed just in case.
/// This type is equivalent to a `RefCell`, in that it wraps the mutable state that multiple referrers
/// may influence.
pub struct TraceBox<Tr: TraceReader> {
    /// accumulated holds on times for advancement.
    pub logical_compaction: MutableAntichain<Tr::Time>,
    /// accumulated holds on times for distinction.
    pub physical_compaction: MutableAntichain<Tr::Time>,
    /// The wrapped trace.
    pub trace: Tr,
}

impl<Tr: TraceReader> TraceBox<Tr> {
    /// Moves an existing trace into a shareable trace wrapper.
    ///
    /// The trace may already exist and have non-initial advance and distinguish frontiers. The boxing
    /// process will fish these out and make sure that they are used for the initial read capabilities.
    pub fn new(mut trace: Tr) -> Self {

        let mut logical_compaction = MutableAntichain::new();
        logical_compaction.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1)));
        let mut physical_compaction = MutableAntichain::new();
        physical_compaction.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1)));

        TraceBox {
            logical_compaction,
            physical_compaction,
            trace,
        }
    }
    /// Replaces elements of `lower` with those of `upper`.
    #[inline]
    pub fn adjust_logical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
        self.logical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
        self.logical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
        self.trace.set_logical_compaction(self.logical_compaction.frontier());
    }
    /// Replaces elements of `lower` with those of `upper`.
    #[inline]
    pub fn adjust_physical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
        self.physical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
        self.physical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
        self.trace.set_physical_compaction(self.physical_compaction.frontier());
    }
}

/// A handle to a shared trace.
///
/// As long as the handle exists, the wrapped trace should continue to exist and will not advance its
/// timestamps past the frontier maintained by the handle. The intent is that such a handle appears as
/// if it is a privately maintained trace, despite being backed by shared resources.
pub struct TraceRc<Tr: TraceReader> {
    logical_compaction: Antichain<Tr::Time>,
    physical_compaction: Antichain<Tr::Time>,
    /// Wrapped trace. Please be gentle when using.
    pub wrapper: Rc<RefCell<TraceBox<Tr>>>,
}

impl<Tr: TraceReader> TraceReader for TraceRc<Tr> {
    type Key<'a> = Tr::Key<'a>;
    type Val<'a> = Tr::Val<'a>;
    type Time = Tr::Time;
    type TimeGat<'a> = Tr::TimeGat<'a>;
    type Diff = Tr::Diff;
    type DiffGat<'a> = Tr::DiffGat<'a>;

    type Batch = Tr::Batch;
    type Storage = Tr::Storage;
    type Cursor = Tr::Cursor;

    /// Sets frontier to now be elements in `frontier`.
    ///
    /// This change may not have immediately observable effects. It informs the shared trace that this
    /// handle no longer requires access to times other than those in the future of `frontier`, but if
    /// there are other handles to the same trace, it may not yet be able to compact.
    fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
        self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), frontier);
        self.logical_compaction = frontier.to_owned();
    }
    fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.logical_compaction.borrow() }
    /// Allows the trace to compact batches of times before `frontier`.
    fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
        self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), frontier);
        self.physical_compaction = frontier.to_owned();
    }
    fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.physical_compaction.borrow() }
    /// Creates a new cursor over the wrapped trace.
    fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, Tr::Storage)> {
        ::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier)
    }

    fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) {
        ::std::cell::RefCell::borrow(&self.wrapper).trace.map_batches(f)
    }
}

impl<Tr: TraceReader> TraceRc<Tr> {
    /// Allocates a new handle from an existing wrapped wrapper.
    pub fn make_from(trace: Tr) -> (Self, Rc<RefCell<TraceBox<Tr>>>) {

        let wrapped = Rc::new(RefCell::new(TraceBox::new(trace)));

        let handle = TraceRc {
            logical_compaction: wrapped.borrow().logical_compaction.frontier().to_owned(),
            physical_compaction: wrapped.borrow().physical_compaction.frontier().to_owned(),
            wrapper: wrapped.clone(),
        };

        (handle, wrapped)
    }
}

impl<Tr: TraceReader> Clone for TraceRc<Tr> {
    fn clone(&self) -> Self {
        // increase ref counts for this frontier
        self.wrapper.borrow_mut().adjust_logical_compaction(Antichain::new().borrow(), self.logical_compaction.borrow());
        self.wrapper.borrow_mut().adjust_physical_compaction(Antichain::new().borrow(), self.physical_compaction.borrow());
        TraceRc {
            logical_compaction: self.logical_compaction.clone(),
            physical_compaction: self.physical_compaction.clone(),
            wrapper: self.wrapper.clone(),
        }
    }
}

impl<Tr: TraceReader> Drop for TraceRc<Tr> {
    fn drop(&mut self) {
        self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), Antichain::new().borrow());
        self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), Antichain::new().borrow());
        self.logical_compaction = Antichain::new();
        self.physical_compaction = Antichain::new();
    }
}