differential_dataflow/containers.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
//! A columnar container based on the columnation library.
use std::iter::FromIterator;
pub use columnation::*;
use timely::container::PushInto;
/// An append-only vector that store records as columns.
///
/// This container maintains elements that might conventionally own
/// memory allocations, but instead the pointers to those allocations
/// reference larger regions of memory shared with multiple instances
/// of the type. Elements can be retrieved as references, and care is
/// taken when this type is dropped to ensure that the correct memory
/// is returned (rather than the incorrect memory, from running the
/// elements `Drop` implementations).
pub struct TimelyStack<T: Columnation> {
local: Vec<T>,
inner: T::InnerRegion,
}
impl<T: Columnation> TimelyStack<T> {
/// Construct a [TimelyStack], reserving space for `capacity` elements
///
/// Note that the associated region is not initialized to a specific capacity
/// because we can't generally know how much space would be required.
pub fn with_capacity(capacity: usize) -> Self {
Self {
local: Vec::with_capacity(capacity),
inner: T::InnerRegion::default(),
}
}
/// Ensures `Self` can absorb `items` without further allocations.
///
/// The argument `items` may be cloned and iterated multiple times.
/// Please be careful if it contains side effects.
#[inline(always)]
pub fn reserve_items<'a, I>(&mut self, items: I)
where
I: Iterator<Item= &'a T>+Clone,
T: 'a,
{
self.local.reserve(items.clone().count());
self.inner.reserve_items(items);
}
/// Ensures `Self` can absorb `items` without further allocations.
///
/// The argument `items` may be cloned and iterated multiple times.
/// Please be careful if it contains side effects.
#[inline(always)]
pub fn reserve_regions<'a, I>(&mut self, regions: I)
where
Self: 'a,
I: Iterator<Item= &'a Self>+Clone,
{
self.local.reserve(regions.clone().map(|cs| cs.local.len()).sum());
self.inner.reserve_regions(regions.map(|cs| &cs.inner));
}
/// Copies an element in to the region.
///
/// The element can be read by indexing
pub fn copy(&mut self, item: &T) {
// TODO: Some types `T` should just be cloned.
// E.g. types that are `Copy` or vecs of ZSTs.
unsafe {
self.local.push(self.inner.copy(item));
}
}
/// Empties the collection.
pub fn clear(&mut self) {
unsafe {
// Unsafety justified in that setting the length to zero exposes
// no invalid data.
self.local.set_len(0);
self.inner.clear();
}
}
/// Retain elements that pass a predicate, from a specified offset.
///
/// This method may or may not reclaim memory in the inner region.
pub fn retain_from<P: FnMut(&T) -> bool>(&mut self, index: usize, mut predicate: P) {
let mut write_position = index;
for position in index..self.local.len() {
if predicate(&self[position]) {
// TODO: compact the inner region and update pointers.
self.local.swap(position, write_position);
write_position += 1;
}
}
unsafe {
// Unsafety justified in that `write_position` is no greater than
// `self.local.len()` and so this exposes no invalid data.
self.local.set_len(write_position);
}
}
/// Unsafe access to `local` data. The slices stor data that is backed by a region
/// allocation. Therefore, it is undefined behavior to mutate elements of the `local` slice.
///
/// # Safety
/// Elements within `local` can be reordered, but not mutated, removed and/or dropped.
pub unsafe fn local(&mut self) -> &mut [T] {
&mut self.local[..]
}
/// Estimate the memory capacity in bytes.
#[inline]
pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
let size_of = std::mem::size_of::<T>();
callback(self.local.len() * size_of, self.local.capacity() * size_of);
self.inner.heap_size(callback);
}
/// Estimate the consumed memory capacity in bytes, summing both used and total capacity.
#[inline]
pub fn summed_heap_size(&self) -> (usize, usize) {
let (mut length, mut capacity) = (0, 0);
self.heap_size(|len, cap| {
length += len;
capacity += cap
});
(length, capacity)
}
/// The length in items.
#[inline]
pub fn len(&self) -> usize {
self.local.len()
}
/// Returns `true` if the stack is empty.
pub fn is_empty(&self) -> bool {
self.local.is_empty()
}
/// The capacity of the local vector.
#[inline]
pub fn capacity(&self) -> usize {
self.local.capacity()
}
/// Reserve space for `additional` elements.
#[inline]
pub fn reserve(&mut self, additional: usize) {
self.local.reserve(additional)
}
}
impl<A: Columnation, B: Columnation> TimelyStack<(A, B)> {
/// Copies a destructured tuple `(A, B)` into this column stack.
///
/// This serves situations where a tuple should be constructed from its constituents but
/// not all elements are available as owned data.
///
/// The element can be read by indexing
pub fn copy_destructured(&mut self, t1: &A, t2: &B) {
unsafe {
self.local.push(self.inner.copy_destructured(t1, t2));
}
}
}
impl<A: Columnation, B: Columnation, C: Columnation> TimelyStack<(A, B, C)> {
/// Copies a destructured tuple `(A, B, C)` into this column stack.
///
/// This serves situations where a tuple should be constructed from its constituents but
/// not all elements are available as owned data.
///
/// The element can be read by indexing
pub fn copy_destructured(&mut self, r0: &A, r1: &B, r2: &C) {
unsafe {
self.local.push(self.inner.copy_destructured(r0, r1, r2));
}
}
}
impl<T: Columnation> std::ops::Deref for TimelyStack<T> {
type Target = [T];
#[inline(always)]
fn deref(&self) -> &Self::Target {
&self.local[..]
}
}
impl<T: Columnation> Drop for TimelyStack<T> {
fn drop(&mut self) {
self.clear();
}
}
impl<T: Columnation> Default for TimelyStack<T> {
fn default() -> Self {
Self {
local: Vec::new(),
inner: T::InnerRegion::default(),
}
}
}
impl<'a, A: 'a + Columnation> FromIterator<&'a A> for TimelyStack<A> {
fn from_iter<T: IntoIterator<Item = &'a A>>(iter: T) -> Self {
let iter = iter.into_iter();
let mut c = TimelyStack::<A>::with_capacity(iter.size_hint().0);
for element in iter {
c.copy(element);
}
c
}
}
impl<T: Columnation + PartialEq> PartialEq for TimelyStack<T> {
fn eq(&self, other: &Self) -> bool {
PartialEq::eq(&self[..], &other[..])
}
}
impl<T: Columnation + Eq> Eq for TimelyStack<T> {}
impl<T: Columnation + std::fmt::Debug> std::fmt::Debug for TimelyStack<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self[..].fmt(f)
}
}
impl<T: Columnation> Clone for TimelyStack<T> {
fn clone(&self) -> Self {
let mut new: Self = Default::default();
for item in &self[..] {
new.copy(item);
}
new
}
fn clone_from(&mut self, source: &Self) {
self.clear();
for item in &source[..] {
self.copy(item);
}
}
}
impl<T: Columnation> PushInto<T> for TimelyStack<T> {
#[inline]
fn push_into(&mut self, item: T) {
self.copy(&item);
}
}
impl<T: Columnation> PushInto<&T> for TimelyStack<T> {
#[inline]
fn push_into(&mut self, item: &T) {
self.copy(item);
}
}
impl<T: Columnation> PushInto<&&T> for TimelyStack<T> {
#[inline]
fn push_into(&mut self, item: &&T) {
self.copy(*item);
}
}
mod container {
use std::ops::Deref;
use columnation::Columnation;
use timely::Container;
use timely::container::SizableContainer;
use crate::containers::TimelyStack;
impl<T: Columnation> Container for TimelyStack<T> {
type ItemRef<'a> = &'a T where Self: 'a;
type Item<'a> = &'a T where Self: 'a;
fn len(&self) -> usize {
self.local.len()
}
fn is_empty(&self) -> bool {
self.local.is_empty()
}
fn clear(&mut self) {
TimelyStack::clear(self)
}
type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
fn iter(&self) -> Self::Iter<'_> {
self.deref().iter()
}
type DrainIter<'a> = std::slice::Iter<'a, T> where Self: 'a;
fn drain(&mut self) -> Self::DrainIter<'_> {
(*self).iter()
}
}
impl<T: Columnation> SizableContainer for TimelyStack<T> {
fn at_capacity(&self) -> bool {
self.len() == self.capacity()
}
fn ensure_capacity(&mut self, stash: &mut Option<Self>) {
if self.capacity() == 0 {
*self = stash.take().unwrap_or_default();
self.clear();
}
let preferred = timely::container::buffer::default_capacity::<T>();
if self.capacity() < preferred {
self.reserve(preferred - self.capacity());
}
}
}
}