tower_lsp/service/client/
pending.rs

1//! Types for tracking server-to-client JSON-RPC requests.
2
3use std::fmt::{self, Debug, Formatter};
4use std::future::Future;
5
6use dashmap::{mapref::entry::Entry, DashMap};
7use futures::channel::oneshot;
8use tracing::warn;
9
10use crate::jsonrpc::{Id, Response};
11
12/// A hashmap containing pending client requests, keyed by request ID.
13pub struct Pending(DashMap<Id, Vec<oneshot::Sender<Response>>>);
14
15impl Pending {
16    /// Creates a new pending client requests map.
17    pub fn new() -> Self {
18        Pending(DashMap::new())
19    }
20
21    /// Inserts the given response into the map.
22    ///
23    /// The corresponding `.wait()` future will then resolve to the given value.
24    pub fn insert(&self, r: Response) {
25        match r.id() {
26            Id::Null => warn!("received response with request ID of `null`, ignoring"),
27            id => match self.0.entry(id.clone()) {
28                Entry::Vacant(_) => warn!("received response with unknown request ID: {}", id),
29                Entry::Occupied(mut entry) => {
30                    let tx = match entry.get().len() {
31                        1 => entry.remove().remove(0),
32                        _ => entry.get_mut().remove(0),
33                    };
34
35                    tx.send(r).expect("receiver already dropped");
36                }
37            },
38        }
39    }
40
41    /// Marks the given request ID as pending and waits for its corresponding response to arrive.
42    ///
43    /// If the same request ID is being waited upon in multiple locations, then the incoming
44    /// response will be routed to one of the callers in a first come, first served basis. To
45    /// ensure correct routing of JSON-RPC requests, each identifier value used _must_ be unique.
46    pub fn wait(&self, id: Id) -> impl Future<Output = Response> + Send + 'static {
47        let (tx, rx) = oneshot::channel();
48
49        match self.0.entry(id) {
50            Entry::Vacant(entry) => {
51                entry.insert(vec![tx]);
52            }
53            Entry::Occupied(mut entry) => {
54                let txs = entry.get_mut();
55                txs.reserve(1); // We assume concurrent waits are rare, so reserve one by one.
56                txs.push(tx);
57            }
58        }
59
60        async { rx.await.expect("sender already dropped") }
61    }
62}
63
64impl Debug for Pending {
65    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
66        #[derive(Debug)]
67        struct Waiters(usize);
68
69        let iter = self
70            .0
71            .iter()
72            .map(|e| (e.key().clone(), Waiters(e.value().len())));
73
74        f.debug_map().entries(iter).finish()
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use serde_json::json;
81
82    use super::*;
83
84    #[tokio::test(flavor = "current_thread")]
85    async fn waits_for_client_response() {
86        let pending = Pending::new();
87
88        let id = Id::Number(1);
89        let wait_fut = pending.wait(id.clone());
90
91        let response = Response::from_ok(id, json!({}));
92        pending.insert(response.clone());
93
94        assert_eq!(wait_fut.await, response);
95    }
96
97    #[tokio::test(flavor = "current_thread")]
98    async fn routes_responses_in_fifo_order() {
99        let pending = Pending::new();
100
101        let id = Id::Number(1);
102        let wait_fut1 = pending.wait(id.clone());
103        let wait_fut2 = pending.wait(id.clone());
104
105        let foo = Response::from_ok(id.clone(), json!("foo"));
106        let bar = Response::from_ok(id, json!("bar"));
107        pending.insert(bar.clone());
108        pending.insert(foo.clone());
109
110        assert_eq!(wait_fut1.await, bar);
111        assert_eq!(wait_fut2.await, foo);
112    }
113}