tower_lsp/service/client/
pending.rs
1use std::fmt::{self, Debug, Formatter};
4use std::future::Future;
5
6use dashmap::{mapref::entry::Entry, DashMap};
7use futures::channel::oneshot;
8use tracing::warn;
9
10use crate::jsonrpc::{Id, Response};
11
12pub struct Pending(DashMap<Id, Vec<oneshot::Sender<Response>>>);
14
15impl Pending {
16 pub fn new() -> Self {
18 Pending(DashMap::new())
19 }
20
21 pub fn insert(&self, r: Response) {
25 match r.id() {
26 Id::Null => warn!("received response with request ID of `null`, ignoring"),
27 id => match self.0.entry(id.clone()) {
28 Entry::Vacant(_) => warn!("received response with unknown request ID: {}", id),
29 Entry::Occupied(mut entry) => {
30 let tx = match entry.get().len() {
31 1 => entry.remove().remove(0),
32 _ => entry.get_mut().remove(0),
33 };
34
35 tx.send(r).expect("receiver already dropped");
36 }
37 },
38 }
39 }
40
41 pub fn wait(&self, id: Id) -> impl Future<Output = Response> + Send + 'static {
47 let (tx, rx) = oneshot::channel();
48
49 match self.0.entry(id) {
50 Entry::Vacant(entry) => {
51 entry.insert(vec![tx]);
52 }
53 Entry::Occupied(mut entry) => {
54 let txs = entry.get_mut();
55 txs.reserve(1); txs.push(tx);
57 }
58 }
59
60 async { rx.await.expect("sender already dropped") }
61 }
62}
63
64impl Debug for Pending {
65 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
66 #[derive(Debug)]
67 struct Waiters(usize);
68
69 let iter = self
70 .0
71 .iter()
72 .map(|e| (e.key().clone(), Waiters(e.value().len())));
73
74 f.debug_map().entries(iter).finish()
75 }
76}
77
78#[cfg(test)]
79mod tests {
80 use serde_json::json;
81
82 use super::*;
83
84 #[tokio::test(flavor = "current_thread")]
85 async fn waits_for_client_response() {
86 let pending = Pending::new();
87
88 let id = Id::Number(1);
89 let wait_fut = pending.wait(id.clone());
90
91 let response = Response::from_ok(id, json!({}));
92 pending.insert(response.clone());
93
94 assert_eq!(wait_fut.await, response);
95 }
96
97 #[tokio::test(flavor = "current_thread")]
98 async fn routes_responses_in_fifo_order() {
99 let pending = Pending::new();
100
101 let id = Id::Number(1);
102 let wait_fut1 = pending.wait(id.clone());
103 let wait_fut2 = pending.wait(id.clone());
104
105 let foo = Response::from_ok(id.clone(), json!("foo"));
106 let bar = Response::from_ok(id, json!("bar"));
107 pending.insert(bar.clone());
108 pending.insert(foo.clone());
109
110 assert_eq!(wait_fut1.await, bar);
111 assert_eq!(wait_fut2.await, foo);
112 }
113}