tower_lsp/service/
pending.rs
1use std::fmt::{self, Debug, Formatter};
4use std::future::Future;
5use std::sync::Arc;
6
7use dashmap::{mapref::entry::Entry, DashMap};
8use futures::future::{self, Either};
9use tracing::{debug, info};
10
11use super::ExitedError;
12use crate::jsonrpc::{Error, Id, Response};
13
14pub struct Pending(Arc<DashMap<Id, future::AbortHandle>>);
16
17impl Pending {
18 pub fn new() -> Self {
20 Pending(Arc::new(DashMap::new()))
21 }
22
23 pub fn execute<F>(
28 &self,
29 id: Id,
30 fut: F,
31 ) -> impl Future<Output = Result<Option<Response>, ExitedError>> + Send + 'static
32 where
33 F: Future<Output = Result<Option<Response>, ExitedError>> + Send + 'static,
34 {
35 if let Entry::Vacant(entry) = self.0.entry(id.clone()) {
36 let (handler_fut, abort_handle) = future::abortable(fut);
37 entry.insert(abort_handle);
38
39 let requests = self.0.clone();
40 Either::Left(async move {
41 let abort_result = handler_fut.await;
42 requests.remove(&id); if let Ok(handler_result) = abort_result {
45 handler_result
46 } else {
47 Ok(Some(Response::from_error(id, Error::request_cancelled())))
48 }
49 })
50 } else {
51 Either::Right(async { Ok(Some(Response::from_error(id, Error::invalid_request()))) })
52 }
53 }
54
55 pub fn cancel(&self, id: &Id) {
60 if let Some((_, handle)) = self.0.remove(id) {
61 handle.abort();
62 info!("successfully cancelled request with ID: {}", id);
63 } else {
64 debug!(
65 "client asked to cancel request {}, but no such pending request exists, ignoring",
66 id
67 );
68 }
69 }
70
71 pub fn cancel_all(&self) {
73 self.0.retain(|_, handle| {
74 handle.abort();
75 false
76 });
77 }
78}
79
80impl Debug for Pending {
81 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
82 f.debug_set()
83 .entries(self.0.iter().map(|entry| entry.key().clone()))
84 .finish()
85 }
86}
87
88#[cfg(test)]
89mod tests {
90 use serde_json::json;
91
92 use super::*;
93
94 #[tokio::test(flavor = "current_thread")]
95 async fn executes_server_request() {
96 let pending = Pending::new();
97
98 let id = Id::Number(1);
99 let id2 = id.clone();
100 let response = pending
101 .execute(id.clone(), async {
102 Ok(Some(Response::from_ok(id2, json!({}))))
103 })
104 .await;
105
106 assert_eq!(response, Ok(Some(Response::from_ok(id, json!({})))));
107 }
108
109 #[tokio::test(flavor = "current_thread")]
110 async fn cancels_server_request() {
111 let pending = Pending::new();
112
113 let id = Id::Number(1);
114 let handler_fut = tokio::spawn(pending.execute(id.clone(), future::pending()));
115
116 pending.cancel(&id);
117
118 let res = handler_fut.await.expect("task panicked");
119 assert_eq!(
120 res,
121 Ok(Some(Response::from_error(id, Error::request_cancelled())))
122 );
123 }
124}