tiberius/query.rs
1use std::borrow::Cow;
2
3use futures_util::io::{AsyncRead, AsyncWrite};
4
5use crate::{
6 tds::{codec::RpcProcId, stream::TokenStream},
7 Client, ColumnData, ExecuteResult, IntoSql, QueryStream,
8};
9
10/// A query object with bind parameters.
11#[derive(Debug)]
12pub struct Query<'a> {
13 sql: Cow<'a, str>,
14 params: Vec<ColumnData<'a>>,
15}
16
17impl<'a> Query<'a> {
18 /// Construct a new query object with the given SQL. If the SQL is
19 /// parameterized, the given number of parameters must be bound to the
20 /// object before executing.
21 ///
22 /// The `sql` can define the parameter placement by annotating them with
23 /// `@PN`, where N is the index of the parameter, starting from `1`.
24 pub fn new(sql: impl Into<Cow<'a, str>>) -> Self {
25 Self {
26 sql: sql.into(),
27 params: Vec::new(),
28 }
29 }
30
31 /// Bind a new parameter to the query. Must be called exactly as many times
32 /// as there are parameters in the given SQL. Otherwise the query will fail
33 /// on execution.
34 pub fn bind(&mut self, param: impl IntoSql<'a> + 'a) {
35 self.params.push(param.into_sql());
36 }
37
38 /// Executes SQL statements in the SQL Server, returning the number rows
39 /// affected. Useful for `INSERT`, `UPDATE` and `DELETE` statements. See
40 /// [`Client#execute`] for a simpler API if the parameters are statically
41 /// known.
42 ///
43 /// # Example
44 ///
45 /// ```no_run
46 /// # use tiberius::{Config, Query};
47 /// # use tokio_util::compat::TokioAsyncWriteCompatExt;
48 /// # use std::env;
49 /// # #[tokio::main]
50 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
51 /// # let c_str = env::var("TIBERIUS_TEST_CONNECTION_STRING").unwrap_or(
52 /// # "server=tcp:localhost,1433;integratedSecurity=true;TrustServerCertificate=true".to_owned(),
53 /// # );
54 /// # let config = Config::from_ado_string(&c_str)?;
55 /// # let tcp = tokio::net::TcpStream::connect(config.get_addr()).await?;
56 /// # tcp.set_nodelay(true)?;
57 /// # let mut client = tiberius::Client::connect(config, tcp.compat_write()).await?;
58 /// let mut query = Query::new("INSERT INTO ##Test (id) VALUES (@P1), (@P2), (@P3)");
59 ///
60 /// query.bind("foo");
61 /// query.bind(2i32);
62 /// query.bind(String::from("bar"));
63 ///
64 /// let results = query.execute(&mut client).await?;
65 /// # Ok(())
66 /// # }
67 /// ```
68 ///
69 /// [`ToSql`]: trait.ToSql.html
70 /// [`FromSql`]: trait.FromSql.html
71 /// [`Client#execute`]: struct.Client.html#method.execute
72 pub async fn execute<'b, S>(self, client: &'b mut Client<S>) -> crate::Result<ExecuteResult>
73 where
74 S: AsyncRead + AsyncWrite + Unpin + Send,
75 {
76 client.connection.flush_stream().await?;
77
78 let rpc_params = Client::<S>::rpc_params(self.sql);
79
80 client
81 .rpc_perform_query(RpcProcId::ExecuteSQL, rpc_params, self.params.into_iter())
82 .await?;
83
84 ExecuteResult::new(&mut client.connection).await
85 }
86
87 /// Executes SQL statements in the SQL Server, returning resulting rows.
88 /// Useful for `SELECT` statements. See [`Client#query`] for a simpler API
89 /// if the parameters are statically known.
90 ///
91 /// # Example
92 ///
93 /// ```
94 /// # use tiberius::{Config, Query};
95 /// # use tokio_util::compat::TokioAsyncWriteCompatExt;
96 /// # use std::env;
97 /// # #[tokio::main]
98 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
99 /// # let c_str = env::var("TIBERIUS_TEST_CONNECTION_STRING").unwrap_or(
100 /// # "server=tcp:localhost,1433;integratedSecurity=true;TrustServerCertificate=true".to_owned(),
101 /// # );
102 /// # let config = Config::from_ado_string(&c_str)?;
103 /// # let tcp = tokio::net::TcpStream::connect(config.get_addr()).await?;
104 /// # tcp.set_nodelay(true)?;
105 /// # let mut client = tiberius::Client::connect(config, tcp.compat_write()).await?;
106 /// let mut query = Query::new("SELECT @P1, @P2, @P3");
107 ///
108 /// query.bind(1i32);
109 /// query.bind(2i32);
110 /// query.bind(3i32);
111 ///
112 /// let stream = query.query(&mut client).await?;
113 /// # Ok(())
114 /// # }
115 /// ```
116 ///
117 /// [`QueryStream`]: struct.QueryStream.html
118 /// [`ToSql`]: trait.ToSql.html
119 /// [`FromSql`]: trait.FromSql.html
120 /// [`Client#query`]: struct.Client.html#method.query
121 pub async fn query<'b, S>(self, client: &'b mut Client<S>) -> crate::Result<QueryStream<'b>>
122 where
123 S: AsyncRead + AsyncWrite + Unpin + Send,
124 {
125 client.connection.flush_stream().await?;
126 let rpc_params = Client::<S>::rpc_params(self.sql);
127
128 client
129 .rpc_perform_query(RpcProcId::ExecuteSQL, rpc_params, self.params.into_iter())
130 .await?;
131
132 let ts = TokenStream::new(&mut client.connection);
133 let mut result = QueryStream::new(ts.try_unfold());
134 result.forward_to_metadata().await?;
135
136 Ok(result)
137 }
138}