1#![allow(dead_code, missing_docs)]
13
14use columnar::{Container, Ref};
15use differential_dataflow::operators::arrange::Arranged;
16use differential_dataflow::operators::arrange::TraceAgent;
17use differential_dataflow::trace::implementations::chunker::ColumnationChunker;
18use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
19use differential_dataflow::trace::implementations::merge_batcher::container::ColInternalMerger;
20use differential_dataflow::trace::wrappers::enter::TraceEnter;
21use differential_dataflow::trace::wrappers::frontier::TraceFrontier;
22use mz_repr::Diff;
23use mz_storage_types::errors::DataflowError;
24use timely::dataflow::ScopeParent;
25
26use crate::row_spine::RowValBuilder;
27use crate::typedefs::spines::{ColKeyBatcher, ColKeyBuilder, ColValBatcher, ColValBuilder};
28
29pub use crate::row_spine::{RowRowSpine, RowSpine, RowValBatcher, RowValSpine};
30pub use crate::typedefs::spines::{ColKeySpine, ColValSpine};
31
32pub(crate) mod spines {
33 use std::rc::Rc;
34
35 use differential_dataflow::containers::{Columnation, TimelyStack};
36 use differential_dataflow::trace::implementations::ord_neu::{
37 OrdKeyBatch, OrdKeyBuilder, OrdValBatch, OrdValBuilder,
38 };
39 use differential_dataflow::trace::implementations::spine_fueled::Spine;
40 use differential_dataflow::trace::implementations::{Layout, Update};
41 use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
42
43 use crate::row_spine::OffsetOptimized;
44 use crate::typedefs::{KeyBatcher, KeyValBatcher};
45
46 pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<MzStack<((K, V), T, R)>>>>;
48 pub type ColValBatcher<K, V, T, R> = KeyValBatcher<K, V, T, R>;
49 pub type ColValBuilder<K, V, T, R> =
50 RcBuilder<OrdValBuilder<MzStack<((K, V), T, R)>, TimelyStack<((K, V), T, R)>>>;
51
52 pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<MzStack<((K, ()), T, R)>>>>;
54 pub type ColKeyBatcher<K, T, R> = KeyBatcher<K, T, R>;
55 pub type ColKeyBuilder<K, T, R> =
56 RcBuilder<OrdKeyBuilder<MzStack<((K, ()), T, R)>, TimelyStack<((K, ()), T, R)>>>;
57
58 pub struct MzStack<U: Update> {
60 phantom: std::marker::PhantomData<U>,
61 }
62
63 impl<U: Update> Layout for MzStack<U>
64 where
65 U::Key: Columnation + 'static,
66 U::Val: Columnation + 'static,
67 U::Time: Columnation,
68 U::Diff: Columnation,
69 {
70 type KeyContainer = TimelyStack<U::Key>;
71 type ValContainer = TimelyStack<U::Val>;
72 type TimeContainer = TimelyStack<U::Time>;
73 type DiffContainer = TimelyStack<U::Diff>;
74 type OffsetContainer = OffsetOptimized;
75 }
76}
77
78pub type KeyValSpine<K, V, T, R> = ColValSpine<K, V, T, R>;
83pub type KeyValAgent<K, V, T, R> = TraceAgent<KeyValSpine<K, V, T, R>>;
84pub type KeyValEnter<K, V, T, R, TEnter> =
85 TraceEnter<TraceFrontier<KeyValAgent<K, V, T, R>>, TEnter>;
86
87pub type KeySpine<K, T, R> = ColKeySpine<K, T, R>;
89pub type KeyAgent<K, T, R> = TraceAgent<KeySpine<K, T, R>>;
90pub type KeyEnter<K, T, R, TEnter> = TraceEnter<TraceFrontier<KeyAgent<K, T, R>>, TEnter>;
91
92pub type RowValAgent<V, T, R> = TraceAgent<RowValSpine<V, T, R>>;
94pub type RowValArrangement<S, V> = Arranged<S, RowValAgent<V, <S as ScopeParent>::Timestamp, Diff>>;
95pub type RowValEnter<V, T, R, TEnter> = TraceEnter<TraceFrontier<RowValAgent<V, T, R>>, TEnter>;
96pub type RowRowAgent<T, R> = TraceAgent<RowRowSpine<T, R>>;
98pub type RowRowArrangement<S> = Arranged<S, RowRowAgent<<S as ScopeParent>::Timestamp, Diff>>;
99pub type RowRowEnter<T, R, TEnter> = TraceEnter<TraceFrontier<RowRowAgent<T, R>>, TEnter>;
100pub type RowAgent<T, R> = TraceAgent<RowSpine<T, R>>;
102pub type RowArrangement<S> = Arranged<S, RowAgent<<S as ScopeParent>::Timestamp, Diff>>;
103pub type RowEnter<T, R, TEnter> = TraceEnter<TraceFrontier<RowAgent<T, R>>, TEnter>;
104
105pub type ErrSpine<T, R> = ColKeySpine<DataflowError, T, R>;
107pub type ErrBatcher<T, R> = ColKeyBatcher<DataflowError, T, R>;
108pub type ErrBuilder<T, R> = ColKeyBuilder<DataflowError, T, R>;
109
110pub type ErrAgent<T, R> = TraceAgent<ErrSpine<T, R>>;
111pub type ErrEnter<T, TEnter> = TraceEnter<TraceFrontier<ErrAgent<T, Diff>>, TEnter>;
112
113pub type KeyErrSpine<K, T, R> = ColValSpine<K, DataflowError, T, R>;
114pub type KeyErrBatcher<K, T, R> = ColValBatcher<K, DataflowError, T, R>;
115pub type KeyErrBuilder<K, T, R> = ColValBuilder<K, DataflowError, T, R>;
116
117pub type RowErrSpine<T, R> = RowValSpine<DataflowError, T, R>;
118pub type RowErrBatcher<T, R> = RowValBatcher<DataflowError, T, R>;
119pub type RowErrBuilder<T, R> = RowValBuilder<DataflowError, T, R>;
120
121pub type KeyBatcher<K, T, D> = KeyValBatcher<K, (), T, D>;
123pub type KeyValBatcher<K, V, T, D> = MergeBatcher<
124 Vec<((K, V), T, D)>,
125 ColumnationChunker<((K, V), T, D)>,
126 ColInternalMerger<(K, V), T, D>,
127>;
128
129pub trait MzTimestamp:
131 MzData + timely::progress::Timestamp + differential_dataflow::lattice::Lattice + std::hash::Hash
132{
133}
134
135impl<T> MzTimestamp for T
136where
137 T: MzData,
138 T: timely::progress::Timestamp,
139 T: differential_dataflow::lattice::Lattice + std::hash::Hash,
140{
141}
142
143pub trait MzData:
146 differential_dataflow::containers::Columnation
147 + for<'a> columnar::Columnar<Container: Container<Ref<'a>: Copy + Ord> + Clone + Send>
148{
149}
150
151impl<T> MzData for T
152where
153 T: differential_dataflow::containers::Columnation,
154 T: for<'a> columnar::Columnar<Container: Clone + Send>,
155 for<'a> Ref<'a, T>: Copy + Ord,
156{
157}
158
159pub trait MzArrangeData: differential_dataflow::containers::Columnation {}
160impl<T> MzArrangeData for T where T: differential_dataflow::containers::Columnation {}