domain/net/client/
dgram_stream.rs1use crate::base::Message;
7use crate::net::client::dgram;
8use crate::net::client::multi_stream;
9use crate::net::client::protocol::{
10 AsyncConnect, AsyncDgramRecv, AsyncDgramSend,
11};
12use crate::net::client::request::{
13 ComposeRequest, Error, GetResponse, SendRequest,
14};
15use bytes::Bytes;
16use std::boxed::Box;
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20use std::sync::Arc;
21
22#[derive(Clone, Debug, Default)]
26pub struct Config {
27 dgram: dgram::Config,
29
30 multi_stream: multi_stream::Config,
32}
33
34impl Config {
35 pub fn new() -> Self {
37 Default::default()
38 }
39
40 pub fn from_parts(
42 dgram: dgram::Config,
43 multi_stream: multi_stream::Config,
44 ) -> Self {
45 Self {
46 dgram,
47 multi_stream,
48 }
49 }
50
51 pub fn dgram(&self) -> &dgram::Config {
53 &self.dgram
54 }
55
56 pub fn dgram_mut(&mut self) -> &mut dgram::Config {
58 &mut self.dgram
59 }
60
61 pub fn set_dgram(&mut self, dgram: dgram::Config) {
63 self.dgram = dgram
64 }
65
66 pub fn stream(&self) -> &multi_stream::Config {
68 &self.multi_stream
69 }
70
71 pub fn stream_mut(&mut self) -> &mut multi_stream::Config {
73 &mut self.multi_stream
74 }
75
76 pub fn set_stream(&mut self, stream: multi_stream::Config) {
78 self.multi_stream = stream
79 }
80}
81
82#[derive(Clone, Debug)]
87pub struct Connection<DgramS, Req> {
88 udp_conn: Arc<dgram::Connection<DgramS>>,
90
91 tcp_conn: multi_stream::Connection<Req>,
93}
94
95impl<DgramS, Req> Connection<DgramS, Req>
96where
97 DgramS: AsyncConnect + Clone + Send + Sync + 'static,
98 DgramS::Connection:
99 AsyncDgramRecv + AsyncDgramSend + Send + Sync + Unpin + 'static,
100{
101 pub fn new<StreamS>(
103 dgram_remote: DgramS,
104 stream_remote: StreamS,
105 ) -> (Self, multi_stream::Transport<StreamS, Req>) {
106 Self::with_config(dgram_remote, stream_remote, Default::default())
107 }
108
109 pub fn with_config<StreamS>(
111 dgram_remote: DgramS,
112 stream_remote: StreamS,
113 config: Config,
114 ) -> (Self, multi_stream::Transport<StreamS, Req>) {
115 let udp_conn =
116 dgram::Connection::with_config(dgram_remote, config.dgram).into();
117 let (tcp_conn, transport) = multi_stream::Connection::with_config(
118 stream_remote,
119 config.multi_stream,
120 );
121 (Self { udp_conn, tcp_conn }, transport)
122 }
123}
124
125impl<DgramS, Req> SendRequest<Req> for Connection<DgramS, Req>
128where
129 DgramS: AsyncConnect + Clone + Debug + Send + Sync + 'static,
130 DgramS::Connection: AsyncDgramRecv + AsyncDgramSend + Send + Sync + Unpin,
131 Req: ComposeRequest + Clone + 'static,
132{
133 fn send_request(
134 &self,
135 request_msg: Req,
136 ) -> Box<dyn GetResponse + Send + Sync> {
137 Box::new(Request::new(
138 request_msg,
139 self.udp_conn.clone(),
140 self.tcp_conn.clone(),
141 ))
142 }
143}
144
145#[derive(Debug)]
149pub struct Request<S, Req> {
150 request_msg: Req,
152
153 udp_conn: Arc<dgram::Connection<S>>,
155
156 tcp_conn: multi_stream::Connection<Req>,
158
159 state: QueryState,
161}
162
163#[derive(Debug)]
165enum QueryState {
166 StartUdpRequest,
168
169 GetUdpResponse(Box<dyn GetResponse + Send + Sync>),
171
172 StartTcpRequest,
174
175 GetTcpResponse(Box<dyn GetResponse + Send + Sync>),
177}
178
179impl<S, Req> Request<S, Req>
180where
181 S: AsyncConnect + Clone + Send + Sync + 'static,
182 Req: ComposeRequest + Clone + 'static,
183{
184 fn new(
188 request_msg: Req,
189 udp_conn: Arc<dgram::Connection<S>>,
190 tcp_conn: multi_stream::Connection<Req>,
191 ) -> Request<S, Req> {
192 Self {
193 request_msg,
194 udp_conn,
195 tcp_conn,
196 state: QueryState::StartUdpRequest,
197 }
198 }
199
200 async fn get_response_impl(&mut self) -> Result<Message<Bytes>, Error>
204 where
205 S::Connection: AsyncDgramRecv + AsyncDgramSend + Send + Sync + Unpin,
206 {
207 loop {
208 match &mut self.state {
209 QueryState::StartUdpRequest => {
210 let msg = self.request_msg.clone();
211 let request = self.udp_conn.send_request(msg);
212 self.state = QueryState::GetUdpResponse(request);
213 continue;
214 }
215 QueryState::GetUdpResponse(ref mut request) => {
216 let response = request.get_response().await?;
217 if response.header().tc() {
218 self.state = QueryState::StartTcpRequest;
219 continue;
220 }
221 return Ok(response);
222 }
223 QueryState::StartTcpRequest => {
224 let msg = self.request_msg.clone();
225 let request = self.tcp_conn.send_request(msg);
226 self.state = QueryState::GetTcpResponse(request);
227 continue;
228 }
229 QueryState::GetTcpResponse(ref mut query) => {
230 let response = query.get_response().await?;
231 return Ok(response);
232 }
233 }
234 }
235 }
236}
237
238impl<S, Req> GetResponse for Request<S, Req>
239where
240 S: AsyncConnect + Clone + Debug + Send + Sync + 'static,
241 S::Connection: AsyncDgramRecv + AsyncDgramSend + Send + Sync + Unpin,
242 Req: ComposeRequest + Clone + 'static,
243{
244 fn get_response(
245 &mut self,
246 ) -> Pin<
247 Box<
248 dyn Future<Output = Result<Message<Bytes>, Error>>
249 + Send
250 + Sync
251 + '_,
252 >,
253 > {
254 Box::pin(self.get_response_impl())
255 }
256}