tower_lsp/service/client/
socket.rs

1//! Loopback connection to the language client.
2
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use futures::channel::mpsc::Receiver;
8use futures::sink::Sink;
9use futures::stream::{FusedStream, Stream, StreamExt};
10
11use super::{ExitedError, Pending, ServerState, State};
12use crate::jsonrpc::{Request, Response};
13
14/// A loopback channel for server-to-client communication.
15#[derive(Debug)]
16pub struct ClientSocket {
17    pub(super) rx: Receiver<Request>,
18    pub(super) pending: Arc<Pending>,
19    pub(super) state: Arc<ServerState>,
20}
21
22impl ClientSocket {
23    /// Splits this `ClientSocket` into two halves capable of operating independently.
24    ///
25    /// The two halves returned implement the [`Stream`] and [`Sink`] traits, respectively.
26    ///
27    /// [`Stream`]: futures::Stream
28    /// [`Sink`]: futures::Sink
29    pub fn split(self) -> (RequestStream, ResponseSink) {
30        let ClientSocket { rx, pending, state } = self;
31        let state_ = state.clone();
32
33        (
34            RequestStream { rx, state: state_ },
35            ResponseSink { pending, state },
36        )
37    }
38}
39
40/// Yields a stream of pending server-to-client requests.
41impl Stream for ClientSocket {
42    type Item = Request;
43
44    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
45        if self.state.get() == State::Exited || self.rx.is_terminated() {
46            Poll::Ready(None)
47        } else {
48            self.rx.poll_next_unpin(cx)
49        }
50    }
51
52    fn size_hint(&self) -> (usize, Option<usize>) {
53        self.rx.size_hint()
54    }
55}
56
57impl FusedStream for ClientSocket {
58    fn is_terminated(&self) -> bool {
59        self.rx.is_terminated()
60    }
61}
62
63/// Routes client-to-server responses back to the server.
64impl Sink<Response> for ClientSocket {
65    type Error = ExitedError;
66
67    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
68        if self.state.get() == State::Exited || self.rx.is_terminated() {
69            Poll::Ready(Err(ExitedError(())))
70        } else {
71            Poll::Ready(Ok(()))
72        }
73    }
74
75    fn start_send(self: Pin<&mut Self>, item: Response) -> Result<(), Self::Error> {
76        self.pending.insert(item);
77        Ok(())
78    }
79
80    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81        Poll::Ready(Ok(()))
82    }
83
84    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
85        Poll::Ready(Ok(()))
86    }
87}
88
89/// Yields a stream of pending server-to-client requests.
90#[derive(Debug)]
91#[must_use = "streams do nothing unless polled"]
92pub struct RequestStream {
93    rx: Receiver<Request>,
94    state: Arc<ServerState>,
95}
96
97impl Stream for RequestStream {
98    type Item = Request;
99
100    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
101        if self.state.get() == State::Exited || self.rx.is_terminated() {
102            Poll::Ready(None)
103        } else {
104            self.rx.poll_next_unpin(cx)
105        }
106    }
107
108    fn size_hint(&self) -> (usize, Option<usize>) {
109        self.rx.size_hint()
110    }
111}
112
113impl FusedStream for RequestStream {
114    fn is_terminated(&self) -> bool {
115        self.rx.is_terminated()
116    }
117}
118
119/// Routes client-to-server responses back to the server.
120#[derive(Debug)]
121pub struct ResponseSink {
122    pending: Arc<Pending>,
123    state: Arc<ServerState>,
124}
125
126impl Sink<Response> for ResponseSink {
127    type Error = ExitedError;
128
129    fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
130        if self.state.get() == State::Exited {
131            Poll::Ready(Err(ExitedError(())))
132        } else {
133            Poll::Ready(Ok(()))
134        }
135    }
136
137    fn start_send(self: Pin<&mut Self>, item: Response) -> Result<(), Self::Error> {
138        self.pending.insert(item);
139        Ok(())
140    }
141
142    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
143        Poll::Ready(Ok(()))
144    }
145
146    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
147        Poll::Ready(Ok(()))
148    }
149}