tiberius/
query.rs

1use std::borrow::Cow;
2
3use futures_util::io::{AsyncRead, AsyncWrite};
4
5use crate::{
6    tds::{codec::RpcProcId, stream::TokenStream},
7    Client, ColumnData, ExecuteResult, IntoSql, QueryStream,
8};
9
10/// A query object with bind parameters.
11#[derive(Debug)]
12pub struct Query<'a> {
13    sql: Cow<'a, str>,
14    params: Vec<ColumnData<'a>>,
15}
16
17impl<'a> Query<'a> {
18    /// Construct a new query object with the given SQL. If the SQL is
19    /// parameterized, the given number of parameters must be bound to the
20    /// object before executing.
21    ///
22    /// The `sql` can define the parameter placement by annotating them with
23    /// `@PN`, where N is the index of the parameter, starting from `1`.
24    pub fn new(sql: impl Into<Cow<'a, str>>) -> Self {
25        Self {
26            sql: sql.into(),
27            params: Vec::new(),
28        }
29    }
30
31    /// Bind a new parameter to the query. Must be called exactly as many times
32    /// as there are parameters in the given SQL. Otherwise the query will fail
33    /// on execution.
34    pub fn bind(&mut self, param: impl IntoSql<'a> + 'a) {
35        self.params.push(param.into_sql());
36    }
37
38    /// Executes SQL statements in the SQL Server, returning the number rows
39    /// affected. Useful for `INSERT`, `UPDATE` and `DELETE` statements. See
40    /// [`Client#execute`] for a simpler API if the parameters are statically
41    /// known.
42    ///
43    /// # Example
44    ///
45    /// ```no_run
46    /// # use tiberius::{Config, Query};
47    /// # use tokio_util::compat::TokioAsyncWriteCompatExt;
48    /// # use std::env;
49    /// # #[tokio::main]
50    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
51    /// # let c_str = env::var("TIBERIUS_TEST_CONNECTION_STRING").unwrap_or(
52    /// #     "server=tcp:localhost,1433;integratedSecurity=true;TrustServerCertificate=true".to_owned(),
53    /// # );
54    /// # let config = Config::from_ado_string(&c_str)?;
55    /// # let tcp = tokio::net::TcpStream::connect(config.get_addr()).await?;
56    /// # tcp.set_nodelay(true)?;
57    /// # let mut client = tiberius::Client::connect(config, tcp.compat_write()).await?;
58    /// let mut query = Query::new("INSERT INTO ##Test (id) VALUES (@P1), (@P2), (@P3)");
59    ///
60    /// query.bind("foo");
61    /// query.bind(2i32);
62    /// query.bind(String::from("bar"));
63    ///
64    /// let results = query.execute(&mut client).await?;
65    /// # Ok(())
66    /// # }
67    /// ```
68    ///
69    /// [`ToSql`]: trait.ToSql.html
70    /// [`FromSql`]: trait.FromSql.html
71    /// [`Client#execute`]: struct.Client.html#method.execute
72    pub async fn execute<'b, S>(self, client: &'b mut Client<S>) -> crate::Result<ExecuteResult>
73    where
74        S: AsyncRead + AsyncWrite + Unpin + Send,
75    {
76        client.connection.flush_stream().await?;
77
78        let rpc_params = Client::<S>::rpc_params(self.sql);
79
80        client
81            .rpc_perform_query(RpcProcId::ExecuteSQL, rpc_params, self.params.into_iter())
82            .await?;
83
84        ExecuteResult::new(&mut client.connection).await
85    }
86
87    /// Executes SQL statements in the SQL Server, returning resulting rows.
88    /// Useful for `SELECT` statements. See [`Client#query`] for a simpler API
89    /// if the parameters are statically known.
90    ///
91    /// # Example
92    ///
93    /// ```
94    /// # use tiberius::{Config, Query};
95    /// # use tokio_util::compat::TokioAsyncWriteCompatExt;
96    /// # use std::env;
97    /// # #[tokio::main]
98    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
99    /// # let c_str = env::var("TIBERIUS_TEST_CONNECTION_STRING").unwrap_or(
100    /// #     "server=tcp:localhost,1433;integratedSecurity=true;TrustServerCertificate=true".to_owned(),
101    /// # );
102    /// # let config = Config::from_ado_string(&c_str)?;
103    /// # let tcp = tokio::net::TcpStream::connect(config.get_addr()).await?;
104    /// # tcp.set_nodelay(true)?;
105    /// # let mut client = tiberius::Client::connect(config, tcp.compat_write()).await?;
106    /// let mut query = Query::new("SELECT @P1, @P2, @P3");
107    ///
108    /// query.bind(1i32);
109    /// query.bind(2i32);
110    /// query.bind(3i32);
111    ///
112    /// let stream = query.query(&mut client).await?;
113    /// # Ok(())
114    /// # }
115    /// ```
116    ///
117    /// [`QueryStream`]: struct.QueryStream.html
118    /// [`ToSql`]: trait.ToSql.html
119    /// [`FromSql`]: trait.FromSql.html
120    /// [`Client#query`]: struct.Client.html#method.query
121    pub async fn query<'b, S>(self, client: &'b mut Client<S>) -> crate::Result<QueryStream<'b>>
122    where
123        S: AsyncRead + AsyncWrite + Unpin + Send,
124    {
125        client.connection.flush_stream().await?;
126        let rpc_params = Client::<S>::rpc_params(self.sql);
127
128        client
129            .rpc_perform_query(RpcProcId::ExecuteSQL, rpc_params, self.params.into_iter())
130            .await?;
131
132        let ts = TokenStream::new(&mut client.connection);
133        let mut result = QueryStream::new(ts.try_unfold());
134        result.forward_to_metadata().await?;
135
136        Ok(result)
137    }
138}