thrift/server/
threaded.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use log::warn;
19
20use std::net::{TcpListener, ToSocketAddrs};
21use std::sync::Arc;
22use threadpool::ThreadPool;
23
24#[cfg(unix)]
25use std::os::unix::net::UnixListener;
26#[cfg(unix)]
27use std::path::Path;
28
29use crate::protocol::{
30    TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory,
31};
32use crate::transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory};
33use crate::{ApplicationError, ApplicationErrorKind};
34
35use super::TProcessor;
36use crate::TransportErrorKind;
37
38/// Fixed-size thread-pool blocking Thrift server.
39///
40/// A `TServer` listens on a given address and submits accepted connections
41/// to an **unbounded** queue. Connections from this queue are serviced by
42/// the first available worker thread from a **fixed-size** thread pool. Each
43/// accepted connection is handled by that worker thread, and communication
44/// over this thread occurs sequentially and synchronously (i.e. calls block).
45/// Accepted connections have an input half and an output half, each of which
46/// uses a `TTransport` and `TInputProtocol`/`TOutputProtocol` to translate
47/// messages to and from byes. Any combination of `TInputProtocol`, `TOutputProtocol`
48/// and `TTransport` may be used.
49///
50/// # Examples
51///
52/// Creating and running a `TServer` using Thrift-compiler-generated
53/// service code.
54///
55/// ```no_run
56/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
57/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
58/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
59/// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory,
60///                         TReadTransportFactory, TWriteTransportFactory};
61/// use thrift::server::{TProcessor, TServer};
62///
63/// //
64/// // auto-generated
65/// //
66///
67/// // processor for `SimpleService`
68/// struct SimpleServiceSyncProcessor;
69/// impl SimpleServiceSyncProcessor {
70///     fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
71///         unimplemented!();
72///     }
73/// }
74///
75/// // `TProcessor` implementation for `SimpleService`
76/// impl TProcessor for SimpleServiceSyncProcessor {
77///     fn process(&self, i: &mut dyn TInputProtocol, o: &mut dyn TOutputProtocol) -> thrift::Result<()> {
78///         unimplemented!();
79///     }
80/// }
81///
82/// // service functions for SimpleService
83/// trait SimpleServiceSyncHandler {
84///     fn service_call(&self) -> thrift::Result<()>;
85/// }
86///
87/// //
88/// // user-code follows
89/// //
90///
91/// // define a handler that will be invoked when `service_call` is received
92/// struct SimpleServiceHandlerImpl;
93/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
94///     fn service_call(&self) -> thrift::Result<()> {
95///         unimplemented!();
96///     }
97/// }
98///
99/// // instantiate the processor
100/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
101///
102/// // instantiate the server
103/// let i_tr_fact: Box<dyn TReadTransportFactory> = Box::new(TBufferedReadTransportFactory::new());
104/// let i_pr_fact: Box<dyn TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
105/// let o_tr_fact: Box<dyn TWriteTransportFactory> = Box::new(TBufferedWriteTransportFactory::new());
106/// let o_pr_fact: Box<dyn TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
107///
108/// let mut server = TServer::new(
109///     i_tr_fact,
110///     i_pr_fact,
111///     o_tr_fact,
112///     o_pr_fact,
113///     processor,
114///     10
115/// );
116///
117/// // start listening for incoming connections
118/// match server.listen("127.0.0.1:8080") {
119///   Ok(_)  => println!("listen completed"),
120///   Err(e) => println!("listen failed with error {:?}", e),
121/// }
122/// ```
123#[derive(Debug)]
124pub struct TServer<PRC, RTF, IPF, WTF, OPF>
125where
126    PRC: TProcessor + Send + Sync + 'static,
127    RTF: TReadTransportFactory + 'static,
128    IPF: TInputProtocolFactory + 'static,
129    WTF: TWriteTransportFactory + 'static,
130    OPF: TOutputProtocolFactory + 'static,
131{
132    r_trans_factory: RTF,
133    i_proto_factory: IPF,
134    w_trans_factory: WTF,
135    o_proto_factory: OPF,
136    processor: Arc<PRC>,
137    worker_pool: ThreadPool,
138}
139
140impl<PRC, RTF, IPF, WTF, OPF> TServer<PRC, RTF, IPF, WTF, OPF>
141where
142    PRC: TProcessor + Send + Sync + 'static,
143    RTF: TReadTransportFactory + 'static,
144    IPF: TInputProtocolFactory + 'static,
145    WTF: TWriteTransportFactory + 'static,
146    OPF: TOutputProtocolFactory + 'static,
147{
148    /// Create a `TServer`.
149    ///
150    /// Each accepted connection has an input and output half, each of which
151    /// requires a `TTransport` and `TProtocol`. `TServer` uses
152    /// `read_transport_factory` and `input_protocol_factory` to create
153    /// implementations for the input, and `write_transport_factory` and
154    /// `output_protocol_factory` to create implementations for the output.
155    pub fn new(
156        read_transport_factory: RTF,
157        input_protocol_factory: IPF,
158        write_transport_factory: WTF,
159        output_protocol_factory: OPF,
160        processor: PRC,
161        num_workers: usize,
162    ) -> TServer<PRC, RTF, IPF, WTF, OPF> {
163        TServer {
164            r_trans_factory: read_transport_factory,
165            i_proto_factory: input_protocol_factory,
166            w_trans_factory: write_transport_factory,
167            o_proto_factory: output_protocol_factory,
168            processor: Arc::new(processor),
169            worker_pool: ThreadPool::with_name("Thrift service processor".to_owned(), num_workers),
170        }
171    }
172
173    /// Listen for incoming connections on `listen_address`.
174    ///
175    /// `listen_address` should implement `ToSocketAddrs` trait.
176    ///
177    /// Return `()` if successful.
178    ///
179    /// Return `Err` when the server cannot bind to `listen_address` or there
180    /// is an unrecoverable error.
181    pub fn listen<A: ToSocketAddrs>(&mut self, listen_address: A) -> crate::Result<()> {
182        let listener = TcpListener::bind(listen_address)?;
183        for stream in listener.incoming() {
184            match stream {
185                Ok(s) => {
186                    let channel = TTcpChannel::with_stream(s);
187                    self.handle_stream(channel)?;
188                }
189                Err(e) => {
190                    warn!("failed to accept remote connection with error {:?}", e);
191                }
192            }
193        }
194
195        Err(crate::Error::Application(ApplicationError {
196            kind: ApplicationErrorKind::Unknown,
197            message: "aborted listen loop".into(),
198        }))
199    }
200
201    /// Listen for incoming connections on `listen_path`.
202    ///
203    /// `listen_path` should implement `AsRef<Path>` trait.
204    ///
205    /// Return `()` if successful.
206    ///
207    /// Return `Err` when the server cannot bind to `listen_path` or there
208    /// is an unrecoverable error.
209    #[cfg(unix)]
210    pub fn listen_uds<P: AsRef<Path>>(&mut self, listen_path: P) -> crate::Result<()> {
211        let listener = UnixListener::bind(listen_path)?;
212        for stream in listener.incoming() {
213            match stream {
214                Ok(s) => {
215                    self.handle_stream(s)?;
216                }
217                Err(e) => {
218                    warn!(
219                        "failed to accept connection via unix domain socket with error {:?}",
220                        e
221                    );
222                }
223            }
224        }
225
226        Err(crate::Error::Application(ApplicationError {
227            kind: ApplicationErrorKind::Unknown,
228            message: "aborted listen loop".into(),
229        }))
230    }
231
232    fn handle_stream<S: TIoChannel + Send + 'static>(&mut self, stream: S) -> crate::Result<()> {
233        let (i_prot, o_prot) = self.new_protocols_for_connection(stream)?;
234        let processor = self.processor.clone();
235        self.worker_pool
236            .execute(move || handle_incoming_connection(processor, i_prot, o_prot));
237        Ok(())
238    }
239
240    fn new_protocols_for_connection<S: TIoChannel + Send + 'static>(
241        &mut self,
242        stream: S,
243    ) -> crate::Result<(
244        Box<dyn TInputProtocol + Send>,
245        Box<dyn TOutputProtocol + Send>,
246    )> {
247        // split it into two - one to be owned by the
248        // input tran/proto and the other by the output
249        let (r_chan, w_chan) = stream.split()?;
250
251        // input protocol and transport
252        let r_tran = self.r_trans_factory.create(Box::new(r_chan));
253        let i_prot = self.i_proto_factory.create(r_tran);
254
255        // output protocol and transport
256        let w_tran = self.w_trans_factory.create(Box::new(w_chan));
257        let o_prot = self.o_proto_factory.create(w_tran);
258
259        Ok((i_prot, o_prot))
260    }
261}
262
263fn handle_incoming_connection<PRC>(
264    processor: Arc<PRC>,
265    i_prot: Box<dyn TInputProtocol>,
266    o_prot: Box<dyn TOutputProtocol>,
267) where
268    PRC: TProcessor,
269{
270    let mut i_prot = i_prot;
271    let mut o_prot = o_prot;
272    loop {
273        match processor.process(&mut *i_prot, &mut *o_prot) {
274            Ok(()) => {}
275            Err(err) => {
276                match err {
277                    crate::Error::Transport(ref transport_err)
278                        if transport_err.kind == TransportErrorKind::EndOfFile => {}
279                    other => warn!("processor completed with error: {:?}", other),
280                }
281                break;
282            }
283        }
284    }
285}