1#![forbid(unsafe_code)]
2
3use crate::{
4 constants,
5 request::{Fwd, Request, SessionZeroCopy},
6 shutdown_mux_master::shutdown_mux_master_from,
7 utils::{serialize_u32, SliceExt},
8 Error, ErrorExt, EstablishedSession, Response, Result, Session, Socket,
9};
10
11use std::{
12 borrow::Cow,
13 convert::TryInto,
14 io,
15 io::IoSlice,
16 num::{NonZeroU32, Wrapping},
17 os::unix::io::RawFd,
18 path::Path,
19};
20
21use sendfd::SendWithFd;
22use serde::{de::DeserializeOwned, Serialize};
23use ssh_format::{from_bytes, Serializer};
24use tokio::net::UnixStream;
25use tokio_io_utility::{read_to_vec_rng, write_vectored_all};
26
27#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
28pub enum ForwardType {
29 Local,
30 Remote,
31}
32
33#[derive(Debug)]
37pub struct Connection {
38 raw_conn: UnixStream,
39 serializer: Serializer,
40 read_buffer: Vec<u8>,
41 request_id: Wrapping<u32>,
42}
43impl Connection {
44 fn reset_serializer(&mut self) {
45 self.serializer.reset_counter();
46 self.serializer.output.clear();
47 }
48
49 async fn write(&mut self, value: &Request) -> Result<()> {
50 self.reset_serializer();
51 value.serialize(&mut self.serializer)?;
52
53 let header = self.serializer.create_header(0)?;
54
55 write_vectored_all(
56 &mut self.raw_conn,
57 &mut [IoSlice::new(&header), IoSlice::new(&self.serializer.output)],
58 )
59 .await?;
60
61 Ok(())
62 }
63
64 fn deserialize<T: DeserializeOwned>(read_buffer: &[u8]) -> Result<T> {
65 Ok(from_bytes(read_buffer)?.0)
67 }
68
69 pub(crate) async fn read_response(&mut self) -> Result<Response> {
70 let buffer = &mut self.read_buffer;
71
72 if buffer.len() < 4 {
73 let n = 4 - buffer.len();
74 read_to_vec_rng(&mut self.raw_conn, buffer, n..).await?;
75 }
76
77 let packet_len: u32 = Self::deserialize(&buffer[..4])?;
79
80 let packet_len: usize = packet_len.try_into().unwrap();
81
82 let buffer_len = buffer.len() - 4;
84
85 if buffer_len < packet_len {
86 let n = packet_len - buffer_len;
88 read_to_vec_rng(&mut self.raw_conn, buffer, n..).await?;
89 }
90
91 let response = Self::deserialize(&buffer[4..(4 + packet_len)])?;
93
94 buffer.drain(..(4 + packet_len));
96
97 Ok(response)
98 }
99
100 async fn send_with_fds(&self, fds: &[RawFd]) -> Result<()> {
102 let byte = &[0];
103
104 loop {
105 self.raw_conn.writable().await?;
106
107 match SendWithFd::send_with_fd(&self.raw_conn, byte, fds) {
109 Ok(n) => {
110 if n == 1 {
111 break Ok(());
112 } else {
113 debug_assert_eq!(n, 0);
114 break Err(io::Error::from(io::ErrorKind::UnexpectedEof).into());
115 }
116 }
117 Err(e) => {
118 if e.kind() != io::ErrorKind::WouldBlock {
119 break Err(e.into());
120 }
121 }
122 }
123 }
124 }
125
126 fn get_request_id(&mut self) -> u32 {
127 let request_id = self.request_id.0;
128 self.request_id += Wrapping(1);
129
130 request_id
131 }
132
133 fn check_response_id(request_id: u32, response_id: u32) -> Result<()> {
134 if request_id != response_id {
135 Err(Error::UnmatchedRequestId)
136 } else {
137 Ok(())
138 }
139 }
140
141 async fn exchange_hello(mut self) -> Result<Self> {
142 self.write(&Request::Hello {
143 version: constants::SSHMUX_VER,
144 })
145 .await?;
146
147 let response = self.read_response().await?;
148 if let Response::Hello { version } = response {
149 if version != constants::SSHMUX_VER {
150 Err(Error::UnsupportedMuxProtocol)
151 } else {
152 Ok(self)
153 }
154 } else {
155 Err(Error::invalid_server_response(&"Hello message", &response))
156 }
157 }
158
159 pub async fn connect<P: AsRef<Path>>(path: P) -> Result<Self> {
160 Self {
161 raw_conn: UnixStream::connect(path).await?,
162 serializer: Serializer::new(Vec::with_capacity(36)),
166 read_buffer: Vec::with_capacity(32),
168 request_id: Wrapping(0),
169 }
170 .exchange_hello()
171 .await
172 }
173
174 pub async fn send_alive_check(&mut self) -> Result<NonZeroU32> {
177 let request_id = self.get_request_id();
178
179 self.write(&Request::AliveCheck { request_id }).await?;
180
181 let response = self.read_response().await?;
182 if let Response::Alive {
183 response_id,
184 server_pid,
185 } = response
186 {
187 Self::check_response_id(request_id, response_id)?;
188 NonZeroU32::new(server_pid).ok_or(Error::InvalidPid)
189 } else {
190 Err(Error::invalid_server_response(
191 &"Response::Alive",
192 &response,
193 ))
194 }
195 }
196
197 async fn open_new_session_impl(
199 &mut self,
200 session: &Session<'_>,
201 fds: &[RawFd; 3],
202 ) -> Result<u32> {
203 use Response::*;
204
205 let request_id = self.get_request_id();
206
207 let term = session.term.as_ref().into_inner();
209 let cmd = session.cmd.as_ref().into_inner();
210
211 let term_len: u32 = term.get_len_as_u32()?;
212 let cmd_len: u32 = cmd.get_len_as_u32()?;
213
214 let request = Request::NewSession {
215 request_id,
216 session: SessionZeroCopy {
217 tty: session.tty,
218 x11_forwarding: session.x11_forwarding,
219 agent: session.agent,
220 subsystem: session.subsystem,
221 escape_ch: session.escape_ch,
222 },
223 };
224
225 self.reset_serializer();
227
228 request.serialize(&mut self.serializer)?;
229 let serialized_header = self.serializer.create_header(
230 4 + term_len + 4 + cmd_len,
231 )?;
232
233 let serialized_cmd_len = serialize_u32(cmd_len);
234 let serialized_term_len = serialize_u32(term_len);
235
236 let mut io_slices = [
238 IoSlice::new(&serialized_header),
239 IoSlice::new(&self.serializer.output),
240 IoSlice::new(&serialized_term_len),
241 IoSlice::new(term),
242 IoSlice::new(&serialized_cmd_len),
243 IoSlice::new(cmd),
244 ];
245
246 write_vectored_all(&mut self.raw_conn, &mut io_slices).await?;
247
248 for fd in fds {
249 self.send_with_fds(&[*fd]).await?;
250 }
251
252 let session_id = match self.read_response().await? {
253 SessionOpened {
254 response_id,
255 session_id,
256 } => {
257 Self::check_response_id(request_id, response_id)?;
258 session_id
259 }
260 PermissionDenied {
261 response_id,
262 reason,
263 } => {
264 Self::check_response_id(request_id, response_id)?;
265 return Err(Error::PermissionDenied(reason));
266 }
267 Failure {
268 response_id,
269 reason,
270 } => {
271 Self::check_response_id(request_id, response_id)?;
272 return Err(Error::RequestFailure(reason));
273 }
274 response => {
275 return Err(Error::invalid_server_response(
276 &"SessionOpened, PermissionDenied or Failure",
277 &response,
278 ))
279 }
280 };
281
282 Result::Ok(session_id)
283 }
284
285 pub async fn open_new_session(
297 mut self,
298 session: &Session<'_>,
299 fds: &[RawFd; 3],
300 ) -> Result<EstablishedSession> {
301 let session_id = self.open_new_session_impl(session, fds).await?;
302
303 self.serializer.output = Vec::new();
306
307 Ok(EstablishedSession {
308 conn: self,
309 session_id,
310 })
311 }
312
313 pub async fn sftp(self, fds: &[RawFd; 3]) -> Result<EstablishedSession> {
316 let session = Session::builder()
317 .subsystem(true)
318 .term(Cow::Borrowed("".try_into().unwrap()))
319 .cmd(Cow::Borrowed("sftp".try_into().unwrap()))
320 .build();
321
322 self.open_new_session(&session, fds).await
323 }
324
325 async fn send_fwd_request(&mut self, request_id: u32, fwd: &Fwd<'_>) -> Result<()> {
326 let (fwd_mode, listen_socket, connect_socket) = fwd.as_serializable();
327 let (listen_addr, listen_port) = listen_socket.as_serializable();
328 let (connect_addr, connect_port) = connect_socket.as_ref().as_serializable();
329
330 let serialized_listen_port = serialize_u32(listen_port);
331 let serialized_connect_port = serialize_u32(connect_port);
332
333 let listen_addr_len: u32 = listen_addr.get_len_as_u32()?;
334 let connect_addr_len: u32 = connect_addr.get_len_as_u32()?;
335
336 let request = Request::OpenFwd {
337 request_id,
338 fwd_mode,
339 };
340
341 self.reset_serializer();
343
344 request.serialize(&mut self.serializer)?;
345 let serialized_header = self.serializer.create_header(
346 4 +
348 listen_addr_len +
349 4 +
351 4 +
353 connect_addr_len
354 + 4,
356 )?;
357
358 let serialized_listen_addr_len = serialize_u32(listen_addr_len);
359 let serialized_connect_addr_len = serialize_u32(connect_addr_len);
360
361 let mut io_slices = [
363 IoSlice::new(&serialized_header),
364 IoSlice::new(&self.serializer.output),
365 IoSlice::new(&serialized_listen_addr_len),
366 IoSlice::new(listen_addr.into_inner()),
367 IoSlice::new(&serialized_listen_port),
368 IoSlice::new(&serialized_connect_addr_len),
369 IoSlice::new(connect_addr.into_inner()),
370 IoSlice::new(&serialized_connect_port),
371 ];
372
373 write_vectored_all(&mut self.raw_conn, &mut io_slices).await?;
374
375 Ok(())
376 }
377
378 async fn send_close_fwd_request(&mut self, request_id: u32, fwd: &Fwd<'_>) -> Result<()> {
379 let (fwd_mode, listen_socket, connect_socket) = fwd.as_serializable();
380 let (listen_addr, listen_port) = listen_socket.as_serializable();
381 let (connect_addr, connect_port) = connect_socket.as_ref().as_serializable();
382
383 let serialized_listen_port = serialize_u32(listen_port);
384 let serialized_connect_port = serialize_u32(connect_port);
385
386 let listen_addr_len: u32 = listen_addr.get_len_as_u32()?;
387 let connect_addr_len: u32 = connect_addr.get_len_as_u32()?;
388
389 let request = Request::CloseFwd {
390 request_id,
391 fwd_mode,
392 };
393
394 self.reset_serializer();
396
397 request.serialize(&mut self.serializer)?;
398 let serialized_header = self.serializer.create_header(
399 4 +
401 listen_addr_len +
402 4 +
404 4 +
406 connect_addr_len
407 + 4,
409 )?;
410
411 let serialized_listen_addr_len = serialize_u32(listen_addr_len);
412 let serialized_connect_addr_len = serialize_u32(connect_addr_len);
413
414 let mut io_slices = [
416 IoSlice::new(&serialized_header),
417 IoSlice::new(&self.serializer.output),
418 IoSlice::new(&serialized_listen_addr_len),
419 IoSlice::new(listen_addr.into_inner()),
420 IoSlice::new(&serialized_listen_port),
421 IoSlice::new(&serialized_connect_addr_len),
422 IoSlice::new(connect_addr.into_inner()),
423 IoSlice::new(&serialized_connect_port),
424 ];
425
426 write_vectored_all(&mut self.raw_conn, &mut io_slices).await?;
427
428 Ok(())
429 }
430
431 pub async fn request_port_forward(
433 &mut self,
434 forward_type: ForwardType,
435 listen_socket: &Socket<'_>,
436 connect_socket: &Socket<'_>,
437 ) -> Result<()> {
438 use ForwardType::*;
439 use Response::*;
440
441 let fwd = match forward_type {
442 Local => Fwd::Local {
443 listen_socket,
444 connect_socket,
445 },
446 Remote => Fwd::Remote {
447 listen_socket,
448 connect_socket,
449 },
450 };
451
452 let request_id = self.get_request_id();
453 self.send_fwd_request(request_id, &fwd).await?;
454
455 match self.read_response().await? {
456 Ok { response_id } => Self::check_response_id(request_id, response_id),
457 PermissionDenied {
458 response_id,
459 reason,
460 } => {
461 Self::check_response_id(request_id, response_id)?;
462 Err(Error::PermissionDenied(reason))
463 }
464 Failure {
465 response_id,
466 reason,
467 } => {
468 Self::check_response_id(request_id, response_id)?;
469 Err(Error::RequestFailure(reason))
470 }
471 response => Err(Error::invalid_server_response(
472 &"Ok, PermissionDenied or Failure",
473 &response,
474 )),
475 }
476 }
477
478 pub async fn close_port_forward(
480 &mut self,
481 forward_type: ForwardType,
482 listen_socket: &Socket<'_>,
483 connect_socket: &Socket<'_>,
484 ) -> Result<()> {
485 use ForwardType::*;
486 use Response::*;
487
488 let fwd = match forward_type {
489 Local => Fwd::Local {
490 listen_socket,
491 connect_socket,
492 },
493 Remote => Fwd::Remote {
494 listen_socket,
495 connect_socket,
496 },
497 };
498
499 let request_id = self.get_request_id();
500 self.send_close_fwd_request(request_id, &fwd).await?;
501
502 match self.read_response().await? {
503 Ok { response_id } => Self::check_response_id(request_id, response_id),
504 PermissionDenied {
505 response_id,
506 reason,
507 } => {
508 Self::check_response_id(request_id, response_id)?;
509 Err(Error::PermissionDenied(reason))
510 }
511 Failure {
512 response_id,
513 reason,
514 } => {
515 Self::check_response_id(request_id, response_id)?;
516 Err(Error::RequestFailure(reason))
517 }
518 response => Err(Error::invalid_server_response(
519 &"Ok, PermissionDenied or Failure",
520 &response,
521 )),
522 }
523 }
524
525 pub async fn request_dynamic_forward(
527 &mut self,
528 listen_socket: &Socket<'_>,
529 ) -> Result<NonZeroU32> {
530 use Response::*;
531
532 let fwd = Fwd::Dynamic { listen_socket };
533
534 let request_id = self.get_request_id();
535 self.send_fwd_request(request_id, &fwd).await?;
536
537 match self.read_response().await? {
538 RemotePort {
539 response_id,
540 remote_port,
541 } => {
542 Self::check_response_id(request_id, response_id)?;
543 NonZeroU32::new(remote_port).ok_or(Error::InvalidPort)
544 }
545 PermissionDenied {
546 response_id,
547 reason,
548 } => {
549 Self::check_response_id(request_id, response_id)?;
550 Err(Error::PermissionDenied(reason))
551 }
552 Failure {
553 response_id,
554 reason,
555 } => {
556 Self::check_response_id(request_id, response_id)?;
557 Err(Error::RequestFailure(reason))
558 }
559 response => Err(Error::invalid_server_response(
560 &"RemotePort, PermissionDenied or Failure",
561 &response,
562 )),
563 }
564 }
565
566 pub async fn request_stop_listening(&mut self) -> Result<()> {
569 use Response::*;
570
571 let request_id = self.get_request_id();
572 self.write(&Request::StopListening { request_id }).await?;
573
574 match self.read_response().await? {
575 Ok { response_id } => {
576 Self::check_response_id(request_id, response_id)?;
577 Result::Ok(())
578 }
579 PermissionDenied {
580 response_id,
581 reason,
582 } => {
583 Self::check_response_id(request_id, response_id)?;
584 Err(Error::PermissionDenied(reason))
585 }
586 Failure {
587 response_id,
588 reason,
589 } => {
590 Self::check_response_id(request_id, response_id)?;
591 Err(Error::RequestFailure(reason))
592 }
593 response => Err(Error::invalid_server_response(
594 &"Ok, PermissionDenied or Failure",
595 &response,
596 )),
597 }
598 }
599
600 pub fn request_stop_listening_sync(self) -> Result<()> {
605 shutdown_mux_master_from(self.raw_conn.into_std()?)
606 }
607}
608
609#[cfg(test)]
610mod tests {
611 use super::*;
612 use crate::SessionStatus;
613
614 use std::convert::TryInto;
615 use std::env;
616 use std::io;
617 use std::os::unix::io::AsRawFd;
618 use std::time::Duration;
619
620 use tokio::io::{AsyncReadExt, AsyncWriteExt};
621 use tokio::net::{TcpListener, TcpStream};
622 use tokio::time::sleep;
623
624 use tokio_pipe::{pipe, PipeRead, PipeWrite};
625
626 const PATH: &str = "/tmp/openssh-mux-client-test.socket";
627
628 macro_rules! run_test {
629 ( $test_name:ident, $func:ident ) => {
630 #[tokio::test(flavor = "current_thread")]
631 async fn $test_name() {
632 $func(Connection::connect(PATH).await.unwrap()).await;
633 }
634 };
635 }
636
637 macro_rules! run_test2 {
638 ( $test_name:ident, $func:ident ) => {
639 #[tokio::test(flavor = "current_thread")]
640 async fn $test_name() {
641 $func(
642 Connection::connect(PATH).await.unwrap(),
643 Connection::connect(PATH).await.unwrap(),
644 )
645 .await;
646 }
647 };
648 }
649
650 async fn test_connect_impl(_conn: Connection) {}
651 run_test!(test_unordered_connect, test_connect_impl);
652
653 async fn test_alive_check_impl(mut conn: Connection) {
654 let expected_pid = env::var("ControlMasterPID").unwrap();
655 let expected_pid: u32 = expected_pid.parse().unwrap();
656
657 let actual_pid = conn.send_alive_check().await.unwrap().get();
658 assert_eq!(expected_pid, actual_pid);
659 }
660 run_test!(test_unordered_alive_check, test_alive_check_impl);
661
662 async fn test_roundtrip<const SIZE: usize>(
663 stdios: &mut (PipeWrite, PipeRead),
664 data: &'static [u8; SIZE],
665 ) {
666 stdios.0.write_all(data).await.unwrap();
667
668 let mut buffer = [0_u8; SIZE];
669 stdios.1.read_exact(&mut buffer).await.unwrap();
670
671 assert_eq!(data, &buffer);
672 }
673
674 async fn create_remote_process(
675 conn: Connection,
676 cmd: &str,
677 ) -> (EstablishedSession, (PipeWrite, PipeRead)) {
678 let session = Session::builder()
679 .cmd(Cow::Borrowed(cmd.try_into().unwrap()))
680 .build();
681
682 let (stdin_read, stdin_write) = pipe().unwrap();
684 let (stdout_read, stdout_write) = pipe().unwrap();
685
686 let established_session = conn
687 .open_new_session(
688 &session,
689 &[
690 stdin_read.as_raw_fd(),
691 stdout_write.as_raw_fd(),
692 io::stderr().as_raw_fd(),
693 ],
694 )
695 .await
696 .unwrap();
697
698 (established_session, (stdin_write, stdout_read))
699 }
700
701 async fn test_open_new_session_impl(conn: Connection) {
702 let (established_session, mut stdios) = create_remote_process(conn, "/bin/cat").await;
703
704 test_roundtrip(&mut stdios, b"0134131dqwdqdx13as\n").await;
708 test_roundtrip(&mut stdios, b"Whats' Up?\n").await;
709
710 drop(stdios);
711
712 let session_status = established_session.wait().await.unwrap();
713 assert_matches!(
714 session_status,
715 SessionStatus::Exited { exit_value, .. }
716 if exit_value.unwrap() == 0
717 );
718 }
719 run_test!(test_unordered_open_new_session, test_open_new_session_impl);
720
721 async fn test_remote_socket_forward_impl(mut conn0: Connection, mut conn1: Connection) {
722 let path = Path::new("/tmp/openssh-remote-forward.socket");
723
724 let output_listener = TcpListener::bind(("127.0.0.1", 1234)).await.unwrap();
725
726 eprintln!("Requesting port forward");
727 conn0
728 .request_port_forward(
729 ForwardType::Remote,
730 &Socket::UnixSocket { path: path.into() },
731 &Socket::TcpSocket {
732 port: 1234,
733 host: "127.0.0.1".into(),
734 },
735 )
736 .await
737 .unwrap();
738
739 eprintln!("Creating remote process");
740 let cmd = format!("/usr/bin/socat OPEN:/data,rdonly UNIX-CONNECT:{:#?}", path);
741 let (established_session, stdios) = create_remote_process(conn0, &cmd).await;
742
743 eprintln!("Waiting for connection");
744 let (mut output, _addr) = output_listener.accept().await.unwrap();
745
746 eprintln!("Reading");
747
748 const DATA: &[u8] = "0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n".as_bytes();
749
750 let mut buffer = [0_u8; DATA.len()];
751 output.read_exact(&mut buffer).await.unwrap();
752
753 assert_eq!(DATA, &buffer);
754
755 eprintln!("Closing port forward");
756 conn1
757 .close_port_forward(
758 ForwardType::Remote,
759 &Socket::UnixSocket { path: path.into() },
760 &Socket::TcpSocket {
761 port: 1234,
762 host: "127.0.0.1".into(),
763 },
764 )
765 .await
766 .unwrap();
767
768 eprintln!("Checking whether the forwarded socket is closed");
769 assert_eq!(output.read(&mut buffer).await.unwrap(), 0);
770
771 drop(output_listener);
772 drop(output);
773 drop(stdios);
774
775 eprintln!("Waiting for session to end");
776 let session_status = established_session.wait().await.unwrap();
777 assert_matches!(
778 session_status,
779 SessionStatus::Exited { exit_value, .. }
780 if exit_value.unwrap() == 0
781 );
782 }
783 run_test2!(
784 test_unordered_remote_socket_forward,
785 test_remote_socket_forward_impl
786 );
787
788 async fn test_local_socket_forward_impl(conn0: Connection, mut conn1: Connection) {
789 let path: Cow<'_, _> = Path::new("/tmp/openssh-local-forward.socket").into();
790
791 eprintln!("Creating remote process");
792 let cmd = format!("socat -u OPEN:/data UNIX-LISTEN:{:#?} >/dev/stderr", path);
793 let (established_session, stdios) = create_remote_process(conn0, &cmd).await;
794
795 sleep(Duration::from_secs(1)).await;
796
797 eprintln!("Requesting port forward");
798 conn1
799 .request_port_forward(
800 ForwardType::Local,
801 &Socket::TcpSocket {
802 port: 1235,
803 host: "127.0.0.1".into(),
804 },
805 &Socket::UnixSocket { path: path.clone() },
806 )
807 .await
808 .unwrap();
809
810 eprintln!("Connecting to forwarded socket");
811 let mut output = TcpStream::connect(("127.0.0.1", 1235)).await.unwrap();
812
813 eprintln!("Reading");
814
815 const DATA: &[u8] = "0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n".as_bytes();
816 let mut buffer = [0_u8; DATA.len()];
817 output.read_exact(&mut buffer).await.unwrap();
818
819 assert_eq!(DATA, buffer);
820
821 eprintln!("Closing port forward");
822 conn1
823 .close_port_forward(
824 ForwardType::Local,
825 &Socket::TcpSocket {
826 port: 1235,
827 host: "127.0.0.1".into(),
828 },
829 &Socket::UnixSocket { path },
830 )
831 .await
832 .unwrap();
833
834 eprintln!("Checking whether the forwarded socket is closed");
835 assert_eq!(output.read(&mut buffer).await.unwrap(), 0);
836
837 drop(output);
838 drop(stdios);
839
840 eprintln!("Waiting for session to end");
841 let session_status = established_session.wait().await.unwrap();
842 assert_matches!(
843 session_status,
844 SessionStatus::Exited { exit_value, .. }
845 if exit_value.unwrap() == 0
846 );
847 }
848 run_test2!(
849 test_unordered_local_socket_forward,
850 test_local_socket_forward_impl
851 );
852
853 async fn test_request_stop_listening_impl(mut conn: Connection) {
854 conn.request_stop_listening().await.unwrap();
855
856 eprintln!("Verify that existing connection is still usable.");
857 test_open_new_session_impl(conn).await;
858
859 eprintln!(
860 "Verify that after the last connection is dropped, the multiplex server \
861 indeed shutdown."
862 );
863 assert_matches!(Connection::connect(PATH).await, Err(_));
864 }
865 run_test!(
866 test_request_stop_listening,
867 test_request_stop_listening_impl
868 );
869}