moka::sync

Struct Cache

Source
pub struct Cache<K, V, S = RandomState> { /* private fields */ }
Expand description

A thread-safe concurrent synchronous in-memory cache.

Cache supports full concurrency of retrievals and a high expected concurrency for updates.

Cache utilizes a lock-free concurrent hash table as the central key-value storage. Cache performs a best-effort bounding of the map using an entry replacement algorithm to determine which entries to evict when the capacity is exceeded.

§Table of Contents

§Example: insert, get and invalidate

Cache entries are manually added using insert or get_with methods, and are stored in the cache until either evicted or manually invalidated.

Here’s an example of reading and updating a cache by using multiple threads:

use moka::sync::Cache;

use std::thread;

fn value(n: usize) -> String {
    format!("value {}", n)
}

const NUM_THREADS: usize = 16;
const NUM_KEYS_PER_THREAD: usize = 64;

// Create a cache that can store up to 10,000 entries.
let cache = Cache::new(10_000);

// Spawn threads and read and update the cache simultaneously.
let threads: Vec<_> = (0..NUM_THREADS)
    .map(|i| {
        // To share the same cache across the threads, clone it.
        // This is a cheap operation.
        let my_cache = cache.clone();
        let start = i * NUM_KEYS_PER_THREAD;
        let end = (i + 1) * NUM_KEYS_PER_THREAD;

        thread::spawn(move || {
            // Insert 64 entries. (NUM_KEYS_PER_THREAD = 64)
            for key in start..end {
                my_cache.insert(key, value(key));
                // get() returns Option<String>, a clone of the stored value.
                assert_eq!(my_cache.get(&key), Some(value(key)));
            }

            // Invalidate every 4 element of the inserted entries.
            for key in (start..end).step_by(4) {
                my_cache.invalidate(&key);
            }
        })
    })
    .collect();

// Wait for all threads to complete.
threads.into_iter().for_each(|t| t.join().expect("Failed"));

// Verify the result.
for key in 0..(NUM_THREADS * NUM_KEYS_PER_THREAD) {
    if key % 4 == 0 {
        assert_eq!(cache.get(&key), None);
    } else {
        assert_eq!(cache.get(&key), Some(value(key)));
    }
}

If you want to atomically initialize and insert a value when the key is not present, you might want to check other insertion methods get_with and try_get_with.

§Avoiding to clone the value at get

The return type of get method is Option<V> instead of Option<&V>. Every time get is called for an existing key, it creates a clone of the stored value V and returns it. This is because the Cache allows concurrent updates from threads so a value stored in the cache can be dropped or replaced at any time by any other thread. get cannot return a reference &V as it is impossible to guarantee the value outlives the reference.

If you want to store values that will be expensive to clone, wrap them by std::sync::Arc before storing in a cache. Arc is a thread-safe reference-counted pointer and its clone() method is cheap.

§Example: Size-based Eviction

use std::convert::TryInto;
use moka::sync::Cache;

// Evict based on the number of entries in the cache.
let cache = Cache::builder()
    // Up to 10,000 entries.
    .max_capacity(10_000)
    // Create the cache.
    .build();
cache.insert(1, "one".to_string());

// Evict based on the byte length of strings in the cache.
let cache = Cache::builder()
    // A weigher closure takes &K and &V and returns a u32
    // representing the relative size of the entry.
    .weigher(|_key, value: &String| -> u32 {
        value.len().try_into().unwrap_or(u32::MAX)
    })
    // This cache will hold up to 32MiB of values.
    .max_capacity(32 * 1024 * 1024)
    .build();
cache.insert(2, "two".to_string());

If your cache should not grow beyond a certain size, use the max_capacity method of the CacheBuilder to set the upper bound. The cache will try to evict entries that have not been used recently or very often.

At the cache creation time, a weigher closure can be set by the weigher method of the CacheBuilder. A weigher closure takes &K and &V as the arguments and returns a u32 representing the relative size of the entry:

  • If the weigher is not set, the cache will treat each entry has the same size of 1. This means the cache will be bounded by the number of entries.
  • If the weigher is set, the cache will call the weigher to calculate the weighted size (relative size) on an entry. This means the cache will be bounded by the total weighted size of entries.

Note that weighted sizes are not used when making eviction selections.

§Example: Time-based Expirations

Cache supports the following expiration policies:

  • Time to live: A cached entry will be expired after the specified duration past from insert.
  • Time to idle: A cached entry will be expired after the specified duration past from get or insert.
use moka::sync::Cache;
use std::time::Duration;

let cache = Cache::builder()
    // Time to live (TTL): 30 minutes
    .time_to_live(Duration::from_secs(30 * 60))
    // Time to idle (TTI):  5 minutes
    .time_to_idle(Duration::from_secs( 5 * 60))
    // Create the cache.
    .build();

// This entry will expire after 5 minutes (TTI) if there is no get().
cache.insert(0, "zero");

// This get() will extend the entry life for another 5 minutes.
cache.get(&0);

// Even though we keep calling get(), the entry will expire
// after 30 minutes (TTL) from the insert().

§Example: Eviction Listener

A Cache can be configured with an eviction listener, a closure that is called every time there is a cache eviction. The listener takes three parameters: the key and value of the evicted entry, and the RemovalCause to indicate why the entry was evicted.

An eviction listener can be used to keep other data structures in sync with the cache, for example.

The following example demonstrates how to use an eviction listener with time-to-live expiration to manage the lifecycle of temporary files on a filesystem. The cache stores the paths of the files, and when one of them has expired, the eviction lister will be called with the path, so it can remove the file from the filesystem.

// Cargo.toml
//
// [dependencies]
// anyhow = "1.0"

use moka::{sync::Cache, notification};

use anyhow::{anyhow, Context};
use std::{
    fs, io,
    path::{Path, PathBuf},
    sync::{Arc, RwLock},
    time::Duration,
};

/// The DataFileManager writes, reads and removes data files.
struct DataFileManager {
    base_dir: PathBuf,
    file_count: usize,
}

impl DataFileManager {
    fn new(base_dir: PathBuf) -> Self {
        Self {
            base_dir,
            file_count: 0,
        }
    }

    fn write_data_file(
        &mut self,
        key: impl AsRef<str>,
        contents: String
    ) -> io::Result<PathBuf> {
        // Use the key as a part of the filename.
        let mut path = self.base_dir.to_path_buf();
        path.push(key.as_ref());

        assert!(!path.exists(), "Path already exists: {:?}", path);

        // create the file at the path and write the contents to the file.
        fs::write(&path, contents)?;
        self.file_count += 1;
        println!("Created a data file at {:?} (file count: {})", path, self.file_count);
        Ok(path)
    }

    fn read_data_file(&self, path: impl AsRef<Path>) -> io::Result<String> {
        // Reads the contents of the file at the path, and return the contents.
        fs::read_to_string(path)
    }

    fn remove_data_file(&mut self, path: impl AsRef<Path>) -> io::Result<()> {
        // Remove the file at the path.
        fs::remove_file(path.as_ref())?;
        self.file_count -= 1;
        println!(
            "Removed a data file at {:?} (file count: {})",
            path.as_ref(),
            self.file_count
        );

        Ok(())
    }
}

fn main() -> anyhow::Result<()> {
    // Create an instance of the DataFileManager and wrap it with
    // Arc<RwLock<_>> so it can be shared across threads.
    let file_mgr = DataFileManager::new(std::env::temp_dir());
    let file_mgr = Arc::new(RwLock::new(file_mgr));

    let file_mgr1 = Arc::clone(&file_mgr);

    // Create an eviction lister closure.
    let listener = move |k, v: PathBuf, cause| {
        // Try to remove the data file at the path `v`.
        println!(
            "\n== An entry has been evicted. k: {:?}, v: {:?}, cause: {:?}",
            k, v, cause
        );

        // Acquire the write lock of the DataFileManager. We must handle
        // error cases here to prevent the listener from panicking.
        match file_mgr1.write() {
            Err(_e) => {
                eprintln!("The lock has been poisoned");
            }
            Ok(mut mgr) => {
                // Remove the data file using the DataFileManager.
                if let Err(_e) = mgr.remove_data_file(v.as_path()) {
                    eprintln!("Failed to remove a data file at {:?}", v);
                }
            }
        }
    };

    // Create the cache. Set time to live for two seconds and set the
    // eviction listener.
    let cache = Cache::builder()
        .max_capacity(100)
        .time_to_live(Duration::from_secs(2))
        .eviction_listener(listener)
        .build();

    // Insert an entry to the cache.
    // This will create and write a data file for the key "user1", store the
    // path of the file to the cache, and return it.
    println!("== try_get_with()");
    let key = "user1";
    let path = cache
        .try_get_with(key, || -> anyhow::Result<_> {
            let mut mgr = file_mgr
                .write()
                .map_err(|_e| anyhow::anyhow!("The lock has been poisoned"))?;
            let path = mgr
                .write_data_file(key, "user data".into())
                .with_context(|| format!("Failed to create a data file"))?;
            Ok(path)
        })
        .map_err(|e| anyhow!("{}", e))?;

    // Read the data file at the path and print the contents.
    println!("\n== read_data_file()");
    {
        let mgr = file_mgr
            .read()
            .map_err(|_e| anyhow::anyhow!("The lock has been poisoned"))?;
        let contents = mgr
            .read_data_file(path.as_path())
            .with_context(|| format!("Failed to read data from {:?}", path))?;
        println!("contents: {}", contents);
    }

    // Sleep for five seconds. While sleeping, the cache entry for key "user1"
    // will be expired and evicted, so the eviction lister will be called to
    // remove the file.
    std::thread::sleep(Duration::from_secs(5));

    Ok(())
}

§You should avoid eviction listener to panic

It is very important to make an eviction listener closure not to panic. Otherwise, the cache will stop calling the listener after a panic. This is an intended behavior because the cache cannot know whether it is memory safe or not to call the panicked lister again.

When a listener panics, the cache will swallow the panic and disable the listener. If you want to know when a listener panics and the reason of the panic, you can enable an optional logging feature of Moka and check error-level logs.

To enable the logging, do the followings:

  1. In Cargo.toml, add the crate feature logging for moka.
  2. Set the logging level for moka to error or any lower levels (warn, info, …):
    • If you are using the env_logger crate, you can achieve this by setting RUST_LOG environment variable to moka=error.
  3. If you have more than one caches, you may want to set a distinct name for each cache by using cache builder’s name method. The name will appear in the log.

§Delivery Modes for Eviction Listener

The DeliveryMode specifies how and when an eviction notifications should be delivered to an eviction listener. The sync caches (Cache and SegmentedCache) support two delivery modes: Immediate and Queued modes.

§Immediate Mode

Tne Immediate mode is the default delivery mode for the sync caches. Use this mode when it is import to keep the order of write operations and eviction notifications.

This mode has the following characteristics:

  • The listener is called immediately after an entry was evicted.
  • The listener is called by the thread who evicted the entry:
    • The calling thread can be a background eviction thread or a user thread invoking a cache write operation such as insert, get_with or invalidate.
    • The calling thread is blocked until the listener returns.
  • This mode guarantees that write operations and eviction notifications for a given cache key are ordered by the time when they occurred.
  • This mode adds some performance overhead to cache write operations as it uses internal per-key lock to guarantee the ordering.

§Queued Mode

Use this mode when write performance is more important than preserving the order of write operations and eviction notifications.

  • The listener will be called some time after an entry was evicted.
  • A notification will be stashed in a queue. The queue will be processed by dedicated notification thread(s) and that thread will call the listener.
  • This mode does not preserve the order of write operations and eviction notifications.
  • This mode adds almost no performance overhead to cache write operations as it does not use the per-key lock.

§Example: Queued Delivery Mode

Because the Immediate mode is the default mode for sync caches, the previous example was using it implicitly.

The following is the same example but modified for the Queued delivery mode. (Showing changed lines only)

// Cargo.toml
//
// [dependencies]
// anyhow = "1.0"
// uuid = { version = "1.1", features = ["v4"] }

use moka::{sync::Cache, notification};

// Use UUID crate to generate a random file name.
use uuid::Uuid;

impl DataFileManager {
    fn write_data_file(
        &mut self,
        _key: impl AsRef<str>,
        contents: String
    ) -> io::Result<PathBuf> {
        // We do not use the key for the filename anymore. Instead, we
        // use UUID to generate a unique filename for each call.
        loop {
            // Generate a file path with unique file name.
            let mut path = self.base_dir.to_path_buf();
            path.push(Uuid::new_v4().as_hyphenated().to_string());

            if path.exists() {
                continue; // This path is already taken by others. Retry.
            }

            // We have got a unique file path, so create the file at
            // the path and write the contents to the file.
            fs::write(&path, contents)?;
            self.file_count += 1;
            println!("Created a data file at {:?} (file count: {})", path, self.file_count);

            // Return the path.
            return Ok(path);
        }
    }

    // Other associate functions and methods are unchanged.
}

fn main() -> anyhow::Result<()> {
    // (Omitted unchanged lines)

    // Create an eviction lister closure.
    // let listener = ...

    // Create a listener configuration with Queued delivery mode.
    let listener_conf = notification::Configuration::builder()
        .delivery_mode(notification::DeliveryMode::Queued)
        .build();

    // Create the cache.
    let cache = Cache::builder()
        .max_capacity(100)
        .time_to_live(Duration::from_secs(2))
        // Set the eviction listener with the configuration.
        .eviction_listener_with_conf(listener, listener_conf)
        .build();

    // Insert an entry to the cache.
    // ...
    // Read the data file at the path and print the contents.
    // ...
    // Sleep for five seconds.
    // ...

    Ok(())
}

As you can see, DataFileManager::write_data_file method no longer uses the cache key for the file name. Instead, it generates a UUID-based unique file name on each call. This kind of treatment will be needed for Queued mode because notifications will be delivered with some delay.

For example, a user thread could do the followings:

  1. insert an entry, and create a file.
  2. The entry is evicted due to size constraint:
    • This will trigger an eviction notification but it will be fired some time later.
    • The notification listener will remove the file when it is called, but we cannot predict when the call would be made.
  3. insert the entry again, and create the file again.

In Queued mode, the notification of the eviction at step 2 can be delivered either before or after the re-insert at step 3. If the write_data_file method does not generate unique file name on each call and the notification has not been delivered before step 3, the user thread could overwrite the file created at step 1. And then the notification will be delivered and the eviction listener will remove a wrong file created at step 3 (instead of the correct one created at step 1). This will cause the cache entires and the files on the filesystem to become out of sync.

Generating unique file names prevents this problem, as the user thread will never overwrite the file created at step 1 and the eviction lister will never remove a wrong file.

§Thread Safety

All methods provided by the Cache are considered thread-safe, and can be safely accessed by multiple concurrent threads.

  • Cache<K, V, S> requires trait bounds Send, Sync and 'static for K (key), V (value) and S (hasher state).
  • Cache<K, V, S> will implement Send and Sync.

§Sharing a cache across threads

To share a cache across threads, do one of the followings:

  • Create a clone of the cache by calling its clone method and pass it to other thread.
  • Wrap the cache by a sync::OnceCell or sync::Lazy from once_cell create, and set it to a static variable.

Cloning is a cheap operation for Cache as it only creates thread-safe reference-counted pointers to the internal data structures.

§Hashing Algorithm

By default, Cache uses a hashing algorithm selected to provide resistance against HashDoS attacks. It will be the same one used by std::collections::HashMap, which is currently SipHash 1-3.

While SipHash’s performance is very competitive for medium sized keys, other hashing algorithms will outperform it for small keys such as integers as well as large keys such as long strings. However those algorithms will typically not protect against attacks such as HashDoS.

The hashing algorithm can be replaced on a per-Cache basis using the build_with_hasher method of the CacheBuilder. Many alternative algorithms are available on crates.io, such as the aHash crate.

Implementations§

Source§

impl<K, V, S> Cache<K, V, S>

Source

pub fn name(&self) -> Option<&str>

Returns cache’s name.

Source

pub fn policy(&self) -> Policy

Returns a read-only cache policy of this cache.

At this time, cache policy cannot be modified after cache creation. A future version may support to modify it.

Source

pub fn entry_count(&self) -> u64

Returns an approximate number of entries in this cache.

The value returned is an estimate; the actual count may differ if there are concurrent insertions or removals, or if some entries are pending removal due to expiration. This inaccuracy can be mitigated by performing a sync() first.

§Example
use moka::sync::Cache;

let cache = Cache::new(10);
cache.insert('n', "Netherland Dwarf");
cache.insert('l', "Lop Eared");
cache.insert('d', "Dutch");

// Ensure an entry exists.
assert!(cache.contains_key(&'n'));

// However, followings may print stale number zeros instead of threes.
println!("{}", cache.entry_count());   // -> 0
println!("{}", cache.weighted_size()); // -> 0

// To mitigate the inaccuracy, bring `ConcurrentCacheExt` trait to
// the scope so we can use `sync` method.
use moka::sync::ConcurrentCacheExt;
// Call `sync` to run pending internal tasks.
cache.sync();

// Followings will print the actual numbers.
println!("{}", cache.entry_count());   // -> 3
println!("{}", cache.weighted_size()); // -> 3
Source

pub fn weighted_size(&self) -> u64

Returns an approximate total weighted size of entries in this cache.

The value returned is an estimate; the actual size may differ if there are concurrent insertions or removals, or if some entries are pending removal due to expiration. This inaccuracy can be mitigated by performing a sync() first. See entry_count for a sample code.

Source§

impl<K, V> Cache<K, V, RandomState>
where K: Hash + Eq + Send + Sync + 'static, V: Clone + Send + Sync + 'static,

Source

pub fn new(max_capacity: u64) -> Self

Constructs a new Cache<K, V> that will store up to the max_capacity.

To adjust various configuration knobs such as initial_capacity or time_to_live, use the CacheBuilder.

Source

pub fn builder() -> CacheBuilder<K, V, Cache<K, V, RandomState>>

Returns a CacheBuilder, which can builds a Cache or SegmentedCache with various configuration knobs.

Source§

impl<K, V, S> Cache<K, V, S>
where K: Hash + Eq + Send + Sync + 'static, V: Clone + Send + Sync + 'static, S: BuildHasher + Clone + Send + Sync + 'static,

Source

pub fn contains_key<Q>(&self, key: &Q) -> bool
where K: Borrow<Q>, Q: Hash + Eq + ?Sized,

Returns true if the cache contains a value for the key.

Unlike the get method, this method is not considered a cache read operation, so it does not update the historic popularity estimator or reset the idle timer for the key.

The key may be any borrowed form of the cache’s key type, but Hash and Eq on the borrowed form must match those for the key type.

Source

pub fn get<Q>(&self, key: &Q) -> Option<V>
where K: Borrow<Q>, Q: Hash + Eq + ?Sized,

Returns a clone of the value corresponding to the key.

If you want to store values that will be expensive to clone, wrap them by std::sync::Arc before storing in a cache. Arc is a thread-safe reference-counted pointer and its clone() method is cheap.

The key may be any borrowed form of the cache’s key type, but Hash and Eq on the borrowed form must match those for the key type.

Source

pub fn get_or_insert_with(&self, key: K, init: impl FnOnce() -> V) -> V

👎Deprecated since 0.8.0: Replaced with get_with

Deprecated, replaced with get_with

Source

pub fn get_or_try_insert_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
where F: FnOnce() -> Result<V, E>, E: Send + Sync + 'static,

👎Deprecated since 0.8.0: Replaced with try_get_with

Deprecated, replaced with try_get_with

Source

pub fn get_with(&self, key: K, init: impl FnOnce() -> V) -> V

Returns a clone of the value corresponding to the key. If the value does not exist, evaluates the init closure and inserts the output.

§Concurrent calls on the same key

This method guarantees that concurrent calls on the same not-existing key are coalesced into one evaluation of the init closure. Only one of the calls evaluates its closure, and other calls wait for that closure to complete.

The following code snippet demonstrates this behavior:

use moka::sync::Cache;
use std::{sync::Arc, thread};

const TEN_MIB: usize = 10 * 1024 * 1024; // 10MiB
let cache = Cache::new(100);

// Spawn four threads.
let threads: Vec<_> = (0..4_u8)
    .map(|task_id| {
        let my_cache = cache.clone();
        thread::spawn(move || {
            println!("Thread {} started.", task_id);

            // Try to insert and get the value for key1. Although all four
            // threads will call `get_with` at the same time, the `init` closure
            // must be evaluated only once.
            let value = my_cache.get_with("key1", || {
                println!("Thread {} inserting a value.", task_id);
                Arc::new(vec![0u8; TEN_MIB])
            });

            // Ensure the value exists now.
            assert_eq!(value.len(), TEN_MIB);
            assert!(my_cache.get(&"key1").is_some());

            println!("Thread {} got the value. (len: {})", task_id, value.len());
        })
    })
    .collect();

// Wait all threads to complete.
threads
    .into_iter()
    .for_each(|t| t.join().expect("Thread failed"));

Result

  • The init closure was called exactly once by thread 1.
  • Other threads were blocked until thread 1 inserted the value.
Thread 1 started.
Thread 0 started.
Thread 3 started.
Thread 2 started.
Thread 1 inserting a value.
Thread 2 got the value. (len: 10485760)
Thread 1 got the value. (len: 10485760)
Thread 0 got the value. (len: 10485760)
Thread 3 got the value. (len: 10485760)
§Panics

This method panics when the init closure has panicked. When it happens, only the caller whose init closure panicked will get the panic (e.g. only thread 1 in the above sample). If there are other calls in progress (e.g. thread 0, 2 and 3 above), this method will restart and resolve one of the remaining init closure.

Source

pub fn get_with_by_ref<Q>(&self, key: &Q, init: impl FnOnce() -> V) -> V
where K: Borrow<Q>, Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,

Similar to get_with, but instead of passing an owned key, you can pass a reference to the key. If the key does not exist in the cache, the key will be cloned to create new entry in the cache.

Source

pub fn get_with_if( &self, key: K, init: impl FnOnce() -> V, replace_if: impl FnMut(&V) -> bool, ) -> V

Works like get_with, but takes an additional replace_if closure.

This method will evaluate the init closure and insert the output to the cache when:

  • The key does not exist.
  • Or, replace_if closure returns true.
Source

pub fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
where F: FnOnce() -> Option<V>,

Returns a clone of the value corresponding to the key. If the value does not exist, evaluates the init closure, and inserts the value if Some(value) was returned. If None was returned from the closure, this method does not insert a value and returns None.

§Concurrent calls on the same key

This method guarantees that concurrent calls on the same not-existing key are coalesced into one evaluation of the init closure. Only one of the calls evaluates its closure, and other calls wait for that closure to complete.

The following code snippet demonstrates this behavior:

use moka::sync::Cache;
use std::{path::Path, thread};

/// This function tries to get the file size in bytes.
fn get_file_size(thread_id: u8, path: impl AsRef<Path>) -> Option<u64> {
    println!("get_file_size() called by thread {}.", thread_id);
    std::fs::metadata(path).ok().map(|m| m.len())
}

let cache = Cache::new(100);

// Spawn four threads.
let threads: Vec<_> = (0..4_u8)
    .map(|thread_id| {
        let my_cache = cache.clone();
        thread::spawn(move || {
            println!("Thread {} started.", thread_id);

            // Try to insert and get the value for key1. Although all four
            // threads will call `optionally_get_with` at the same time,
            // get_file_size() must be called only once.
            let value = my_cache.optionally_get_with(
                "key1",
                || get_file_size(thread_id, "./Cargo.toml"),
            );

            // Ensure the value exists now.
            assert!(value.is_some());
            assert!(my_cache.get(&"key1").is_some());

            println!(
                "Thread {} got the value. (len: {})",
                thread_id,
                value.unwrap()
            );
        })
    })
    .collect();

// Wait all threads to complete.
threads
    .into_iter()
    .for_each(|t| t.join().expect("Thread failed"));

Result

  • get_file_size() was called exactly once by thread 0.
  • Other threads were blocked until thread 0 inserted the value.
Thread 0 started.
Thread 1 started.
Thread 2 started.
get_file_size() called by thread 0.
Thread 3 started.
Thread 2 got the value. (len: 1466)
Thread 0 got the value. (len: 1466)
Thread 1 got the value. (len: 1466)
Thread 3 got the value. (len: 1466)
§Panics

This method panics when the init closure has panicked. When it happens, only the caller whose init closure panicked will get the panic (e.g. only thread 1 in the above sample). If there are other calls in progress (e.g. thread 0, 2 and 3 above), this method will restart and resolve one of the remaining init closure.

Source

pub fn optionally_get_with_by_ref<F, Q>(&self, key: &Q, init: F) -> Option<V>
where F: FnOnce() -> Option<V>, K: Borrow<Q>, Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,

Similar to optionally_get_with, but instead of passing an owned key, you can pass a reference to the key. If the key does not exist in the cache, the key will be cloned to create new entry in the cache.

Source

pub fn try_get_with<F, E>(&self, key: K, init: F) -> Result<V, Arc<E>>
where F: FnOnce() -> Result<V, E>, E: Send + Sync + 'static,

Returns a clone of the value corresponding to the key. If the value does not exist, evaluates the init closure, and inserts the value if Ok(value) was returned. If Err(_) was returned from the closure, this method does not insert a value and returns the Err wrapped by std::sync::Arc.

§Concurrent calls on the same key

This method guarantees that concurrent calls on the same not-existing key are coalesced into one evaluation of the init closure (as long as these closures return the same error type). Only one of the calls evaluates its closure, and other calls wait for that closure to complete.

The following code snippet demonstrates this behavior:

use moka::sync::Cache;
use std::{path::Path, thread};

/// This function tries to get the file size in bytes.
fn get_file_size(thread_id: u8, path: impl AsRef<Path>) -> Result<u64, std::io::Error> {
    println!("get_file_size() called by thread {}.", thread_id);
    Ok(std::fs::metadata(path)?.len())
}

let cache = Cache::new(100);

// Spawn four threads.
let threads: Vec<_> = (0..4_u8)
    .map(|thread_id| {
        let my_cache = cache.clone();
        thread::spawn(move || {
            println!("Thread {} started.", thread_id);

            // Try to insert and get the value for key1. Although all four
            // threads will call `try_get_with` at the same time,
            // get_file_size() must be called only once.
            let value = my_cache.try_get_with(
                "key1",
                || get_file_size(thread_id, "./Cargo.toml"),
            );

            // Ensure the value exists now.
            assert!(value.is_ok());
            assert!(my_cache.get(&"key1").is_some());

            println!(
                "Thread {} got the value. (len: {})",
                thread_id,
                value.unwrap()
            );
        })
    })
    .collect();

// Wait all threads to complete.
threads
    .into_iter()
    .for_each(|t| t.join().expect("Thread failed"));

Result

  • get_file_size() was called exactly once by thread 1.
  • Other threads were blocked until thread 1 inserted the value.
Thread 1 started.
Thread 2 started.
get_file_size() called by thread 1.
Thread 3 started.
Thread 0 started.
Thread 2 got the value. (len: 1466)
Thread 0 got the value. (len: 1466)
Thread 1 got the value. (len: 1466)
Thread 3 got the value. (len: 1466)
§Panics

This method panics when the init closure has panicked. When it happens, only the caller whose init closure panicked will get the panic (e.g. only thread 1 in the above sample). If there are other calls in progress (e.g. thread 0, 2 and 3 above), this method will restart and resolve one of the remaining init closure.

Source

pub fn try_get_with_by_ref<F, E, Q>( &self, key: &Q, init: F, ) -> Result<V, Arc<E>>
where F: FnOnce() -> Result<V, E>, E: Send + Sync + 'static, K: Borrow<Q>, Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,

Similar to try_get_with, but instead of passing an owned key, you can pass a reference to the key. If the key does not exist in the cache, the key will be cloned to create new entry in the cache.

Source

pub fn insert(&self, key: K, value: V)

Inserts a key-value pair into the cache.

If the cache has this key present, the value is updated.

Source

pub fn invalidate<Q>(&self, key: &Q)
where K: Borrow<Q>, Q: Hash + Eq + ?Sized,

Discards any cached value for the key.

The key may be any borrowed form of the cache’s key type, but Hash and Eq on the borrowed form must match those for the key type.

Source

pub fn invalidate_all(&self)

Discards all cached values.

This method returns immediately and a background thread will evict all the cached values inserted before the time when this method was called. It is guaranteed that the get method must not return these invalidated values even if they have not been evicted.

Like the invalidate method, this method does not clear the historic popularity estimator of keys so that it retains the client activities of trying to retrieve an item.

Source

pub fn invalidate_entries_if<F>( &self, predicate: F, ) -> Result<PredicateId, PredicateError>
where F: Fn(&K, &V) -> bool + Send + Sync + 'static,

Discards cached values that satisfy a predicate.

invalidate_entries_if takes a closure that returns true or false. This method returns immediately and a background thread will apply the closure to each cached value inserted before the time when invalidate_entries_if was called. If the closure returns true on a value, that value will be evicted from the cache.

Also the get method will apply the closure to a value to determine if it should have been invalidated. Therefore, it is guaranteed that the get method must not return invalidated values.

Note that you must call CacheBuilder::support_invalidation_closures at the cache creation time as the cache needs to maintain additional internal data structures to support this method. Otherwise, calling this method will fail with a PredicateError::InvalidationClosuresDisabled.

Like the invalidate method, this method does not clear the historic popularity estimator of keys so that it retains the client activities of trying to retrieve an item.

Source

pub fn iter(&self) -> Iter<'_, K, V>

Creates an iterator visiting all key-value pairs in arbitrary order. The iterator element type is (Arc<K>, V), where V is a clone of a stored value.

Iterators do not block concurrent reads and writes on the cache. An entry can be inserted to, invalidated or evicted from a cache while iterators are alive on the same cache.

Unlike the get method, visiting entries via an iterator do not update the historic popularity estimator or reset idle timers for keys.

§Guarantees

In order to allow concurrent access to the cache, iterator’s next method does not guarantee the following:

  • It does not guarantee to return a key-value pair (an entry) if its key has been inserted to the cache after the iterator was created.
    • Such an entry may or may not be returned depending on key’s hash and timing.

and the next method guarantees the followings:

  • It guarantees not to return the same entry more than once.
  • It guarantees not to return an entry if it has been removed from the cache after the iterator was created.
    • Note: An entry can be removed by following reasons:
      • Manually invalidated.
      • Expired (e.g. time-to-live).
      • Evicted as the cache capacity exceeded.
§Examples
use moka::sync::Cache;

let cache = Cache::new(100);
cache.insert("Julia", 14);

let mut iter = cache.iter();
let (k, v) = iter.next().unwrap(); // (Arc<K>, V)
assert_eq!(*k, "Julia");
assert_eq!(v, 14);

assert!(iter.next().is_none());

Trait Implementations§

Source§

impl<K, V, S> Clone for Cache<K, V, S>

Source§

fn clone(&self) -> Self

Makes a clone of this shared cache.

This operation is cheap as it only creates thread-safe reference counted pointers to the shared internal data structures.

1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<K, V, S> ConcurrentCacheExt<K, V> for Cache<K, V, S>
where K: Hash + Eq + Send + Sync + 'static, V: Clone + Send + Sync + 'static, S: BuildHasher + Clone + Send + Sync + 'static,

Source§

fn sync(&self)

Performs any pending maintenance operations needed by the cache.
Source§

impl<K, V, S> Debug for Cache<K, V, S>
where K: Debug + Eq + Hash + Send + Sync + 'static, V: Debug + Clone + Send + Sync + 'static, S: BuildHasher + Clone + Send + Sync + 'static,

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<'a, K, V, S> IntoIterator for &'a Cache<K, V, S>
where K: Hash + Eq + Send + Sync + 'static, V: Clone + Send + Sync + 'static, S: BuildHasher + Clone + Send + Sync + 'static,

Source§

type Item = (Arc<K>, V)

The type of the elements being iterated over.
Source§

type IntoIter = Iter<'a, K, V>

Which kind of iterator are we turning this into?
Source§

fn into_iter(self) -> Self::IntoIter

Creates an iterator from a value. Read more
Source§

impl<K, V, S> Send for Cache<K, V, S>
where K: Send + Sync, V: Send + Sync, S: Send,

Source§

impl<K, V, S> Sync for Cache<K, V, S>
where K: Send + Sync, V: Send + Sync, S: Sync,

Auto Trait Implementations§

§

impl<K, V, S> Freeze for Cache<K, V, S>

§

impl<K, V, S = RandomState> !RefUnwindSafe for Cache<K, V, S>

§

impl<K, V, S> Unpin for Cache<K, V, S>

§

impl<K, V, S = RandomState> !UnwindSafe for Cache<K, V, S>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dst: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dst. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Pointable for T

Source§

const ALIGN: usize = _

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.