#![allow(unused)]
use super::{base_cache::Inner, PredicateId, PredicateIdStr};
use crate::{
common::{
concurrent::{
thread_pool::{PoolName, ThreadPool, ThreadPoolRegistry},
unsafe_weak_pointer::UnsafeWeakPointer,
AccessTime, KvEntry, ValueEntry,
},
time::Instant,
},
PredicateError,
};
use parking_lot::{Mutex, RwLock};
use std::{
collections::HashMap,
hash::{BuildHasher, Hash},
marker::PhantomData,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Weak,
},
time::Duration,
};
use triomphe::Arc as TrioArc;
use uuid::Uuid;
pub(crate) type PredicateFun<K, V> = Arc<dyn Fn(&K, &V) -> bool + Send + Sync + 'static>;
pub(crate) trait GetOrRemoveEntry<K, V> {
fn get_value_entry(&self, key: &Arc<K>, hash: u64) -> Option<TrioArc<ValueEntry<K, V>>>;
fn remove_key_value_if(
&self,
key: &Arc<K>,
hash: u64,
condition: impl FnMut(&Arc<K>, &TrioArc<ValueEntry<K, V>>) -> bool,
) -> Option<TrioArc<ValueEntry<K, V>>>
where
K: Send + Sync + 'static,
V: Clone + Send + Sync + 'static;
}
pub(crate) struct KeyDateLite<K> {
key: Arc<K>,
hash: u64,
timestamp: Instant,
}
impl<K> Clone for KeyDateLite<K> {
fn clone(&self) -> Self {
Self {
key: Arc::clone(&self.key),
hash: self.hash,
timestamp: self.timestamp,
}
}
}
impl<K> KeyDateLite<K> {
pub(crate) fn new(key: &Arc<K>, hash: u64, timestamp: Instant) -> Self {
Self {
key: Arc::clone(key),
hash,
timestamp,
}
}
}
pub(crate) struct InvalidationResult<K, V> {
pub(crate) invalidated: Vec<KvEntry<K, V>>,
pub(crate) is_done: bool,
}
impl<K, V> InvalidationResult<K, V> {
fn new(invalidated: Vec<KvEntry<K, V>>, is_done: bool) -> Self {
Self {
invalidated,
is_done,
}
}
}
pub(crate) struct Invalidator<K, V, S> {
predicates: RwLock<HashMap<PredicateId, Predicate<K, V>>>,
is_empty: AtomicBool,
scan_context: Arc<ScanContext<K, V, S>>,
thread_pool: Arc<ThreadPool>,
}
impl<K, V, S> Drop for Invalidator<K, V, S> {
fn drop(&mut self) {
let ctx = &self.scan_context;
ctx.is_shutting_down.store(true, Ordering::Release);
while ctx.is_running.load(Ordering::Acquire) {
std::thread::sleep(Duration::from_millis(1));
}
ThreadPoolRegistry::release_pool(&self.thread_pool);
}
}
impl<K, V, S> Invalidator<K, V, S> {
pub(crate) fn new(cache: Weak<Inner<K, V, S>>) -> Self {
let thread_pool = ThreadPoolRegistry::acquire_pool(PoolName::Invalidator);
Self {
predicates: RwLock::new(HashMap::new()),
is_empty: AtomicBool::new(true),
scan_context: Arc::new(ScanContext::new(cache)),
thread_pool,
}
}
pub(crate) fn is_empty(&self) -> bool {
self.is_empty.load(Ordering::Acquire)
}
pub(crate) fn remove_predicates_registered_before(&self, ts: Instant) {
let mut pred_map = self.predicates.write();
let removing_ids = pred_map
.iter()
.filter(|(_, pred)| pred.registered_at <= ts)
.map(|(id, _)| id)
.cloned()
.collect::<Vec<_>>();
for id in removing_ids {
pred_map.remove(&id);
}
if pred_map.is_empty() {
self.is_empty.store(true, Ordering::Release);
}
}
pub(crate) fn register_predicate(
&self,
predicate: PredicateFun<K, V>,
registered_at: Instant,
) -> Result<PredicateId, PredicateError> {
const MAX_RETRY: usize = 1_000;
let mut tries = 0;
let mut preds = self.predicates.write();
while tries < MAX_RETRY {
let id = Uuid::new_v4().as_hyphenated().to_string();
if preds.contains_key(&id) {
tries += 1;
continue; }
let pred = Predicate::new(&id, predicate, registered_at);
preds.insert(id.clone(), pred);
self.is_empty.store(false, Ordering::Release);
return Ok(id);
}
panic!("Cannot assign a new PredicateId to a predicate");
}
#[inline]
pub(crate) fn apply_predicates(&self, key: &Arc<K>, entry: &TrioArc<ValueEntry<K, V>>) -> bool {
if self.is_empty() {
false
} else if let Some(ts) = entry.last_modified() {
Self::do_apply_predicates(self.predicates.read().values(), key, &entry.value, ts)
} else {
false
}
}
pub(crate) fn is_task_running(&self) -> bool {
self.scan_context.is_running.load(Ordering::Acquire)
}
pub(crate) fn submit_task(&self, candidates: Vec<KeyDateLite<K>>, is_truncated: bool)
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
S: BuildHasher + Send + Sync + 'static,
{
let ctx = &self.scan_context;
if ctx.is_shutting_down.load(Ordering::Acquire) {
return;
}
assert!(!self.is_task_running());
assert!(ctx.result.lock().is_none());
{
let mut ps = ctx.predicates.lock();
if ps.is_empty() {
*ps = self.predicates.read().values().cloned().collect();
}
}
self.scan_context.is_running.store(true, Ordering::Release);
let task = ScanTask::new(&self.scan_context, candidates, is_truncated);
self.thread_pool.pool.execute(move || {
task.execute();
});
}
pub(crate) fn task_result(&self) -> Option<InvalidationResult<K, V>> {
assert!(!self.is_task_running());
let ctx = &self.scan_context;
ctx.result.lock().take().map(|result| {
self.remove_finished_predicates(ctx, &result);
let is_done = ctx.predicates.lock().is_empty();
InvalidationResult::new(result.invalidated, is_done)
})
}
}
impl<K, V, S> Invalidator<K, V, S> {
#[inline]
fn do_apply_predicates<'a, I>(predicates: I, key: &'a K, value: &'a V, ts: Instant) -> bool
where
I: Iterator<Item = &'a Predicate<K, V>>,
{
for predicate in predicates {
if predicate.is_applicable(ts) && predicate.apply(key, value) {
return true;
}
}
false
}
fn remove_finished_predicates(&self, ctx: &ScanContext<K, V, S>, result: &ScanResult<K, V>) {
let mut predicates = ctx.predicates.lock();
if result.is_truncated {
if let Some(ts) = result.newest_timestamp {
let (active, finished): (Vec<_>, Vec<_>) =
predicates.drain(..).partition(|p| p.is_applicable(ts));
self.remove_predicates(&finished);
*predicates = active;
}
} else {
self.remove_predicates(&predicates);
predicates.clear();
}
}
fn remove_predicates(&self, predicates: &[Predicate<K, V>]) {
let mut pred_map = self.predicates.write();
predicates.iter().for_each(|p| {
pred_map.remove(p.id());
});
if pred_map.is_empty() {
self.is_empty.store(true, Ordering::Release);
}
}
}
#[cfg(test)]
impl<K, V, S> Invalidator<K, V, S> {
pub(crate) fn predicate_count(&self) -> usize {
self.predicates.read().len()
}
}
struct ScanContext<K, V, S> {
predicates: Mutex<Vec<Predicate<K, V>>>,
cache: Mutex<UnsafeWeakPointer<Inner<K, V, S>>>,
result: Mutex<Option<ScanResult<K, V>>>,
is_running: AtomicBool,
is_shutting_down: AtomicBool,
_marker: PhantomData<S>,
}
impl<K, V, S> ScanContext<K, V, S> {
fn new(cache: Weak<Inner<K, V, S>>) -> Self {
Self {
predicates: Mutex::new(Vec::default()),
cache: Mutex::new(UnsafeWeakPointer::from_weak_arc(cache)),
result: Mutex::new(None),
is_running: AtomicBool::new(false),
is_shutting_down: AtomicBool::new(false),
_marker: PhantomData::default(),
}
}
}
struct Predicate<K, V> {
id: PredicateId,
f: PredicateFun<K, V>,
registered_at: Instant,
}
impl<K, V> Clone for Predicate<K, V> {
fn clone(&self) -> Self {
Self {
id: self.id.clone(),
f: Arc::clone(&self.f),
registered_at: self.registered_at,
}
}
}
impl<K, V> Predicate<K, V> {
fn new(id: PredicateIdStr<'_>, f: PredicateFun<K, V>, registered_at: Instant) -> Self {
Self {
id: id.to_string(),
f,
registered_at,
}
}
fn id(&self) -> PredicateIdStr<'_> {
&self.id
}
fn is_applicable(&self, last_modified: Instant) -> bool {
last_modified <= self.registered_at
}
fn apply(&self, key: &K, value: &V) -> bool {
(self.f)(key, value)
}
}
struct ScanTask<K, V, S> {
scan_context: Arc<ScanContext<K, V, S>>,
candidates: Vec<KeyDateLite<K>>,
is_truncated: bool,
}
impl<K, V, S> ScanTask<K, V, S>
where
K: Hash + Eq,
S: BuildHasher,
{
fn new(
scan_context: &Arc<ScanContext<K, V, S>>,
candidates: Vec<KeyDateLite<K>>,
is_truncated: bool,
) -> Self {
Self {
scan_context: Arc::clone(scan_context),
candidates,
is_truncated,
}
}
fn execute(&self)
where
K: Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
let cache_lock = self.scan_context.cache.lock();
let weak = unsafe { cache_lock.as_weak_arc() };
if let Some(inner_cache) = weak.upgrade() {
*self.scan_context.result.lock() = Some(self.do_execute(&inner_cache));
self.scan_context.is_running.store(false, Ordering::Release);
UnsafeWeakPointer::forget_arc(inner_cache);
} else {
*self.scan_context.result.lock() = Some(ScanResult::default());
self.scan_context.is_running.store(false, Ordering::Release);
UnsafeWeakPointer::forget_weak_arc(weak);
}
}
fn do_execute<C>(&self, cache: &Arc<C>) -> ScanResult<K, V>
where
Arc<C>: GetOrRemoveEntry<K, V>,
K: Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
let predicates = self.scan_context.predicates.lock();
let mut invalidated = Vec::default();
let mut newest_timestamp = None;
for candidate in &self.candidates {
let key = &candidate.key;
let hash = candidate.hash;
let ts = candidate.timestamp;
if Self::apply(&predicates, cache, key, hash, ts) {
if let Some(entry) = Self::invalidate(cache, key, hash, ts) {
invalidated.push(KvEntry {
key: Arc::clone(key),
entry,
})
}
}
newest_timestamp = Some(ts);
}
ScanResult {
invalidated,
is_truncated: self.is_truncated,
newest_timestamp,
}
}
fn apply<C>(
predicates: &[Predicate<K, V>],
cache: &Arc<C>,
key: &Arc<K>,
hash: u64,
ts: Instant,
) -> bool
where
Arc<C>: GetOrRemoveEntry<K, V>,
{
if let Some(entry) = cache.get_value_entry(key, hash) {
if let Some(lm) = entry.last_modified() {
if lm == ts {
return Invalidator::<_, _, S>::do_apply_predicates(
predicates.iter(),
key,
&entry.value,
lm,
);
}
}
}
false
}
fn invalidate<C>(
cache: &Arc<C>,
key: &Arc<K>,
hash: u64,
ts: Instant,
) -> Option<TrioArc<ValueEntry<K, V>>>
where
Arc<C>: GetOrRemoveEntry<K, V>,
K: Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
cache.remove_key_value_if(key, hash, |_, v| {
if let Some(lm) = v.last_modified() {
lm == ts
} else {
false
}
})
}
}
struct ScanResult<K, V> {
invalidated: Vec<KvEntry<K, V>>,
is_truncated: bool,
newest_timestamp: Option<Instant>,
}
impl<K, V> Default for ScanResult<K, V> {
fn default() -> Self {
Self {
invalidated: Vec::default(),
is_truncated: false,
newest_timestamp: None,
}
}
}