mysql_async/queryable/
mod.rs

1// Copyright (c) 2017 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use futures_util::FutureExt;
10use mysql_common::{
11    constants::MAX_PAYLOAD_LEN,
12    io::ParseBuf,
13    proto::{Binary, Text},
14    row::RowDeserializer,
15    value::ServerSide,
16};
17
18use std::{fmt, sync::Arc};
19
20use self::{
21    query_result::QueryResult,
22    stmt::Statement,
23    transaction::{Transaction, TxStatus},
24};
25
26use crate::{
27    conn::routines::{PingRoutine, QueryRoutine},
28    consts::CapabilityFlags,
29    error::*,
30    prelude::{FromRow, StatementLike},
31    query::AsQuery,
32    queryable::query_result::ResultSetMeta,
33    tracing_utils::{LevelInfo, LevelTrace, TracingLevel},
34    BoxFuture, Column, Conn, Connection, Params, ResultSetStream, Row,
35};
36
37pub mod query_result;
38pub mod stmt;
39pub mod transaction;
40
41pub trait Protocol: fmt::Debug + Send + Sync + 'static {
42    /// Returns `ResultSetMeta`, that corresponds to the current protocol.
43    fn result_set_meta(columns: Arc<[Column]>) -> ResultSetMeta;
44    fn read_result_set_row(packet: &[u8], columns: Arc<[Column]>) -> Result<Row>;
45    fn is_last_result_set_packet(capabilities: CapabilityFlags, packet: &[u8]) -> bool {
46        if capabilities.contains(CapabilityFlags::CLIENT_DEPRECATE_EOF) {
47            packet[0] == 0xFE && packet.len() < MAX_PAYLOAD_LEN
48        } else {
49            packet[0] == 0xFE && packet.len() < 8
50        }
51    }
52}
53
54/// Phantom struct used to specify MySql text protocol.
55#[derive(Debug)]
56pub struct TextProtocol;
57
58/// Phantom struct used to specify MySql binary protocol.
59#[derive(Debug)]
60pub struct BinaryProtocol;
61
62impl Protocol for TextProtocol {
63    fn result_set_meta(columns: Arc<[Column]>) -> ResultSetMeta {
64        ResultSetMeta::Text(columns)
65    }
66
67    fn read_result_set_row(packet: &[u8], columns: Arc<[Column]>) -> Result<Row> {
68        ParseBuf(packet)
69            .parse::<RowDeserializer<ServerSide, Text>>(columns)
70            .map(Into::into)
71            .map_err(Into::into)
72    }
73}
74
75impl Protocol for BinaryProtocol {
76    fn result_set_meta(columns: Arc<[Column]>) -> ResultSetMeta {
77        ResultSetMeta::Binary(columns)
78    }
79
80    fn read_result_set_row(packet: &[u8], columns: Arc<[Column]>) -> Result<Row> {
81        ParseBuf(packet)
82            .parse::<RowDeserializer<ServerSide, Binary>>(columns)
83            .map(Into::into)
84            .map_err(Into::into)
85    }
86}
87
88impl Conn {
89    /// The purpose of this function is to rollback a transaction or to drop query result in cases,
90    /// where `Transaction` was dropped without an explicit call to `commit` or `rollback`,
91    /// or where `QueryResult` was dropped without being consumed.
92    ///
93    /// The difference betwee this function and [`Conn::cleanup`] is that this function
94    /// won't rollback existing transaction. Another difference, is that this function
95    /// won't ignore non-fatal errors.
96    pub(crate) async fn clean_dirty(&mut self) -> Result<()> {
97        self.drop_result().await?;
98        if self.get_tx_status() == TxStatus::RequiresRollback {
99            self.rollback_transaction().await?;
100        }
101        Ok(())
102    }
103
104    /// Low level function that performs a text query.
105    pub(crate) async fn raw_query<'a, Q, L: TracingLevel>(&'a mut self, query: Q) -> Result<()>
106    where
107        Q: AsQuery + 'a,
108    {
109        self.routine(QueryRoutine::<'_, L>::new(query.as_query().as_ref()))
110            .await
111    }
112
113    /// Used for internal querying of connection settings,
114    /// bypassing instrumentation meant for user queries.
115    // This is a merge of `Queryable::query_first` and `Conn::query_iter`.
116    // TODO: find a cleaner way without duplicating code.
117    pub(crate) fn query_internal<'a, T, Q>(&'a mut self, query: Q) -> BoxFuture<'a, Option<T>>
118    where
119        Q: AsQuery + 'a,
120        T: FromRow + Send + 'static,
121    {
122        async move {
123            self.raw_query::<'_, _, LevelTrace>(query).await?;
124            Ok(QueryResult::<'_, '_, TextProtocol>::new(self)
125                .collect_and_drop::<T>()
126                .await?
127                .pop())
128        }
129        .boxed()
130    }
131}
132
133/// Methods of this trait are used to execute database queries.
134///
135/// `Conn` is a `Queryable` as well as `Transaction`.
136pub trait Queryable: Send {
137    /// Executes `COM_PING`.
138    fn ping(&mut self) -> BoxFuture<'_, ()>;
139
140    /// Performs the given query and returns the result.
141    fn query_iter<'a, Q>(
142        &'a mut self,
143        query: Q,
144    ) -> BoxFuture<'a, QueryResult<'a, 'static, TextProtocol>>
145    where
146        Q: AsQuery + 'a;
147
148    /// Prepares the given statement.
149    ///
150    /// Note, that `Statement` will exist only in the context of this queryable.
151    ///
152    /// Also note, that this call may close the least recently used statement
153    /// if statement cache is at its capacity (see. [`stmt_cache_size`][stmt_cache_size]).
154    ///
155    /// [stmt_cache_size]: crate::Opts::stmt_cache_size
156    fn prep<'a, Q>(&'a mut self, query: Q) -> BoxFuture<'a, Statement>
157    where
158        Q: AsQuery + 'a;
159
160    /// Closes the given statement.
161    ///
162    /// Usually there is no need to explicitly close statements
163    /// (see. [`stmt_cache_size`][stmt_cache_size]).
164    ///
165    /// [stmt_cache_size]: crate::Opts::stmt_cache_size
166    fn close(&mut self, stmt: Statement) -> BoxFuture<'_, ()>;
167
168    /// Executes the given statement with given params.
169    ///
170    /// It'll prepare `stmt`, if necessary.
171    fn exec_iter<'a: 's, 's, Q, P>(
172        &'a mut self,
173        stmt: Q,
174        params: P,
175    ) -> BoxFuture<'s, QueryResult<'a, 'static, BinaryProtocol>>
176    where
177        Q: StatementLike + 'a,
178        P: Into<Params>;
179
180    /// Performs the given query and collects the first result set.
181    ///
182    /// ## Conversion
183    ///
184    /// This stream will convert each row into `T` using [`FromRow`] implementation.
185    /// If the row type is unknown please use the [`Row`] type for `T`
186    /// to make this conversion infallible.
187    fn query<'a, T, Q>(&'a mut self, query: Q) -> BoxFuture<'a, Vec<T>>
188    where
189        Q: AsQuery + 'a,
190        T: FromRow + Send + 'static,
191    {
192        async move { self.query_iter(query).await?.collect_and_drop::<T>().await }.boxed()
193    }
194
195    /// Performs the given query and returns the first row of the first result set.
196    ///
197    /// ## Conversion
198    ///
199    /// This stream will convert each row into `T` using [`FromRow`] implementation.
200    /// If the row type is unknown please use the [`Row`] type for `T`
201    /// to make this conversion infallible.
202    fn query_first<'a, T, Q>(&'a mut self, query: Q) -> BoxFuture<'a, Option<T>>
203    where
204        Q: AsQuery + 'a,
205        T: FromRow + Send + 'static,
206    {
207        async move {
208            let mut result = self.query_iter(query).await?;
209            let output = if result.is_empty() {
210                None
211            } else {
212                result.next().await?.map(crate::from_row)
213            };
214            result.drop_result().await?;
215            Ok(output)
216        }
217        .boxed()
218    }
219
220    /// Performs the given query and maps each row of the first result set.
221    ///
222    /// ## Conversion
223    ///
224    /// This stream will convert each row into `T` using [`FromRow`] implementation.
225    /// If the row type is unknown please use the [`Row`] type for `T`
226    /// to make this conversion infallible.
227    fn query_map<'a, T, F, Q, U>(&'a mut self, query: Q, mut f: F) -> BoxFuture<'a, Vec<U>>
228    where
229        Q: AsQuery + 'a,
230        T: FromRow + Send + 'static,
231        F: FnMut(T) -> U + Send + 'a,
232        U: Send,
233    {
234        async move {
235            self.query_fold(query, Vec::new(), |mut acc, row| {
236                acc.push(f(crate::from_row(row)));
237                acc
238            })
239            .await
240        }
241        .boxed()
242    }
243
244    /// Performs the given query and folds the first result set to a single value.
245    ///
246    /// ## Conversion
247    ///
248    /// This stream will convert each row into `T` using [`FromRow`] implementation.
249    /// If the row type is unknown please use the [`Row`] type for `T`
250    /// to make this conversion infallible.
251    fn query_fold<'a, T, F, Q, U>(&'a mut self, query: Q, init: U, mut f: F) -> BoxFuture<'a, U>
252    where
253        Q: AsQuery + 'a,
254        T: FromRow + Send + 'static,
255        F: FnMut(U, T) -> U + Send + 'a,
256        U: Send + 'a,
257    {
258        async move {
259            self.query_iter(query)
260                .await?
261                .reduce_and_drop(init, |acc, row| f(acc, crate::from_row(row)))
262                .await
263        }
264        .boxed()
265    }
266
267    /// Performs the given query and drops the query result.
268    fn query_drop<'a, Q>(&'a mut self, query: Q) -> BoxFuture<'a, ()>
269    where
270        Q: AsQuery + 'a,
271    {
272        async move { self.query_iter(query).await?.drop_result().await }.boxed()
273    }
274
275    /// Executes the given statement for each item in the given params iterator.
276    ///
277    /// It'll prepare `stmt` (once), if necessary.
278    fn exec_batch<'a: 'b, 'b, S, P, I>(&'a mut self, stmt: S, params_iter: I) -> BoxFuture<'b, ()>
279    where
280        S: StatementLike + 'b,
281        I: IntoIterator<Item = P> + Send + 'b,
282        I::IntoIter: Send,
283        P: Into<Params> + Send;
284
285    /// Executes the given statement and collects the first result set.
286    ///
287    /// It'll prepare `stmt`, if necessary.
288    ///
289    /// ## Conversion
290    ///
291    /// This stream will convert each row into `T` using [`FromRow`] implementation.
292    /// If the row type is unknown please use the [`Row`] type for `T`
293    /// to make this conversion infallible.
294    fn exec<'a: 'b, 'b, T, S, P>(&'a mut self, stmt: S, params: P) -> BoxFuture<'b, Vec<T>>
295    where
296        S: StatementLike + 'b,
297        P: Into<Params> + Send + 'b,
298        T: FromRow + Send + 'static,
299    {
300        async move {
301            self.exec_iter(stmt, params)
302                .await?
303                .collect_and_drop::<T>()
304                .await
305        }
306        .boxed()
307    }
308
309    /// Executes the given statement and returns the first row of the first result set.
310    ///
311    /// It'll prepare `stmt`, if necessary.
312    ///
313    /// ## Conversion
314    ///
315    /// This stream will convert each row into `T` using [`FromRow`] implementation.
316    /// If the row type is unknown please use the [`Row`] type for `T`
317    /// to make this conversion infallible.
318    fn exec_first<'a: 'b, 'b, T, S, P>(&'a mut self, stmt: S, params: P) -> BoxFuture<'b, Option<T>>
319    where
320        S: StatementLike + 'b,
321        P: Into<Params> + Send + 'b,
322        T: FromRow + Send + 'static,
323    {
324        async move {
325            let mut result = self.exec_iter(stmt, params).await?;
326            let row = if result.is_empty() {
327                None
328            } else {
329                result.next().await?
330            };
331            result.drop_result().await?;
332            Ok(row.map(crate::from_row))
333        }
334        .boxed()
335    }
336
337    /// Executes the given stmt and maps each row of the first result set.
338    ///
339    /// It'll prepare `stmt`, if necessary.
340    ///
341    /// ## Conversion
342    ///
343    /// This stream will convert each row into `T` using [`FromRow`] implementation.
344    /// If the row type is unknown please use the [`Row`] type for `T`
345    /// to make this conversion infallible.
346    fn exec_map<'a: 'b, 'b, T, S, P, U, F>(
347        &'a mut self,
348        stmt: S,
349        params: P,
350        mut f: F,
351    ) -> BoxFuture<'b, Vec<U>>
352    where
353        S: StatementLike + 'b,
354        P: Into<Params> + Send + 'b,
355        T: FromRow + Send + 'static,
356        F: FnMut(T) -> U + Send + 'a,
357        U: Send + 'a,
358    {
359        async move {
360            self.exec_fold(stmt, params, Vec::new(), |mut acc, row| {
361                acc.push(f(crate::from_row(row)));
362                acc
363            })
364            .await
365        }
366        .boxed()
367    }
368
369    /// Executes the given stmt and folds the first result set to a signel value.
370    ///
371    /// It'll prepare `stmt`, if necessary.
372    ///
373    /// ## Conversion
374    ///
375    /// This stream will convert each row into `T` using [`FromRow`] implementation.
376    /// If the row type is unknown please use the [`Row`] type for `T`
377    /// to make this conversion infallible.
378    fn exec_fold<'a: 'b, 'b, T, S, P, U, F>(
379        &'a mut self,
380        stmt: S,
381        params: P,
382        init: U,
383        mut f: F,
384    ) -> BoxFuture<'b, U>
385    where
386        S: StatementLike + 'b,
387        P: Into<Params> + Send + 'b,
388        T: FromRow + Send + 'static,
389        F: FnMut(U, T) -> U + Send + 'a,
390        U: Send + 'a,
391    {
392        async move {
393            self.exec_iter(stmt, params)
394                .await?
395                .reduce_and_drop(init, |acc, row| f(acc, crate::from_row(row)))
396                .await
397        }
398        .boxed()
399    }
400
401    /// Executes the given statement and drops the result.
402    fn exec_drop<'a: 'b, 'b, S, P>(&'a mut self, stmt: S, params: P) -> BoxFuture<'b, ()>
403    where
404        S: StatementLike + 'b,
405        P: Into<Params> + Send + 'b,
406    {
407        async move { self.exec_iter(stmt, params).await?.drop_result().await }.boxed()
408    }
409
410    /// Returns a stream over the first result set.
411    ///
412    /// Please see [`QueryResult::stream_and_drop`][stream_and_drop].
413    ///
414    /// [stream_and_drop]: crate::QueryResult::stream_and_drop
415    fn query_stream<'a, T, Q>(
416        &'a mut self,
417        query: Q,
418    ) -> BoxFuture<'a, ResultSetStream<'a, 'a, 'static, T, TextProtocol>>
419    where
420        T: Unpin + FromRow + Send + 'static,
421        Q: AsQuery + 'a,
422    {
423        async move {
424            self.query_iter(query)
425                .await?
426                .stream_and_drop()
427                .await
428                .transpose()
429                .expect("At least one result set is expected")
430        }
431        .boxed()
432    }
433
434    /// Returns a stream over the first result set.
435    ///
436    /// Please see [`QueryResult::stream_and_drop`][stream_and_drop].
437    ///
438    /// [stream_and_drop]: crate::QueryResult::stream_and_drop
439    fn exec_stream<'a: 's, 's, T, Q, P>(
440        &'a mut self,
441        stmt: Q,
442        params: P,
443    ) -> BoxFuture<'s, ResultSetStream<'a, 'a, 'static, T, BinaryProtocol>>
444    where
445        T: Unpin + FromRow + Send + 'static,
446        Q: StatementLike + 'a,
447        P: Into<Params> + Send + 's,
448    {
449        async move {
450            self.exec_iter(stmt, params)
451                .await?
452                .stream_and_drop()
453                .await
454                .transpose()
455                .expect("At least one result set is expected")
456        }
457        .boxed()
458    }
459}
460
461impl Queryable for Conn {
462    fn ping(&mut self) -> BoxFuture<'_, ()> {
463        async move {
464            self.routine(PingRoutine).await?;
465            Ok(())
466        }
467        .boxed()
468    }
469
470    fn query_iter<'a, Q>(
471        &'a mut self,
472        query: Q,
473    ) -> BoxFuture<'a, QueryResult<'a, 'static, TextProtocol>>
474    where
475        Q: AsQuery + 'a,
476    {
477        async move {
478            self.raw_query::<'_, _, LevelInfo>(query).await?;
479            Ok(QueryResult::new(self))
480        }
481        .boxed()
482    }
483
484    fn prep<'a, Q>(&'a mut self, query: Q) -> BoxFuture<'a, Statement>
485    where
486        Q: AsQuery + 'a,
487    {
488        async move { self.get_statement(query.as_query()).await }.boxed()
489    }
490
491    fn close(&mut self, stmt: Statement) -> BoxFuture<'_, ()> {
492        async move {
493            self.stmt_cache_mut().remove(stmt.id());
494            self.close_statement(stmt.id()).await
495        }
496        .boxed()
497    }
498
499    fn exec_iter<'a: 's, 's, Q, P>(
500        &'a mut self,
501        stmt: Q,
502        params: P,
503    ) -> BoxFuture<'s, QueryResult<'a, 'static, BinaryProtocol>>
504    where
505        Q: StatementLike + 'a,
506        P: Into<Params>,
507    {
508        let params = params.into();
509        async move {
510            let statement = self.get_statement(stmt).await?;
511            self.execute_statement(&statement, params).await?;
512            Ok(QueryResult::new(self))
513        }
514        .boxed()
515    }
516
517    fn exec_batch<'a: 'b, 'b, S, P, I>(&'a mut self, stmt: S, params_iter: I) -> BoxFuture<'b, ()>
518    where
519        S: StatementLike + 'b,
520        I: IntoIterator<Item = P> + Send + 'b,
521        I::IntoIter: Send,
522        P: Into<Params> + Send,
523    {
524        async move {
525            let statement = self.get_statement(stmt).await?;
526            for params in params_iter {
527                self.execute_statement(&statement, params).await?;
528                QueryResult::<BinaryProtocol>::new(&mut *self)
529                    .drop_result()
530                    .await?;
531            }
532            Ok(())
533        }
534        .boxed()
535    }
536}
537
538impl Queryable for Transaction<'_> {
539    fn ping(&mut self) -> BoxFuture<'_, ()> {
540        self.0.as_mut().ping()
541    }
542
543    fn query_iter<'a, Q>(
544        &'a mut self,
545        query: Q,
546    ) -> BoxFuture<'a, QueryResult<'a, 'static, TextProtocol>>
547    where
548        Q: AsQuery + 'a,
549    {
550        self.0.as_mut().query_iter(query)
551    }
552
553    fn prep<'a, Q>(&'a mut self, query: Q) -> BoxFuture<'a, Statement>
554    where
555        Q: AsQuery + 'a,
556    {
557        self.0.as_mut().prep(query)
558    }
559
560    fn close(&mut self, stmt: Statement) -> BoxFuture<'_, ()> {
561        self.0.as_mut().close(stmt)
562    }
563
564    fn exec_iter<'a: 's, 's, Q, P>(
565        &'a mut self,
566        stmt: Q,
567        params: P,
568    ) -> BoxFuture<'s, QueryResult<'a, 'static, BinaryProtocol>>
569    where
570        Q: StatementLike + 'a,
571        P: Into<Params>,
572    {
573        self.0.as_mut().exec_iter(stmt, params)
574    }
575
576    fn exec_batch<'a: 'b, 'b, S, P, I>(&'a mut self, stmt: S, params_iter: I) -> BoxFuture<'b, ()>
577    where
578        S: StatementLike + 'b,
579        I: IntoIterator<Item = P> + Send + 'b,
580        I::IntoIter: Send,
581        P: Into<Params> + Send,
582    {
583        self.0.as_mut().exec_batch(stmt, params_iter)
584    }
585}
586
587impl<'c, 't: 'c> Queryable for Connection<'c, 't> {
588    #[inline]
589    fn ping(&mut self) -> BoxFuture<'_, ()> {
590        self.as_mut().ping()
591    }
592
593    #[inline]
594    fn query_iter<'a, Q>(
595        &'a mut self,
596        query: Q,
597    ) -> BoxFuture<'a, QueryResult<'a, 'static, TextProtocol>>
598    where
599        Q: AsQuery + 'a,
600    {
601        self.as_mut().query_iter(query)
602    }
603
604    fn prep<'a, Q>(&'a mut self, query: Q) -> BoxFuture<'a, Statement>
605    where
606        Q: AsQuery + 'a,
607    {
608        self.as_mut().prep(query)
609    }
610
611    fn close(&mut self, stmt: Statement) -> BoxFuture<'_, ()> {
612        self.as_mut().close(stmt)
613    }
614
615    fn exec_iter<'a: 's, 's, Q, P>(
616        &'a mut self,
617        stmt: Q,
618        params: P,
619    ) -> BoxFuture<'s, QueryResult<'a, 'static, BinaryProtocol>>
620    where
621        Q: StatementLike + 'a,
622        P: Into<Params>,
623    {
624        self.as_mut().exec_iter(stmt, params)
625    }
626
627    fn exec_batch<'a: 'b, 'b, S, P, I>(&'a mut self, stmt: S, params_iter: I) -> BoxFuture<'b, ()>
628    where
629        S: StatementLike + 'b,
630        I: IntoIterator<Item = P> + Send + 'b,
631        I::IntoIter: Send,
632        P: Into<Params> + Send,
633    {
634        self.as_mut().exec_batch(stmt, params_iter)
635    }
636}
637
638#[cfg(test)]
639mod tests {
640    use crate::{error::Result, prelude::*, test_misc::get_opts, Conn};
641
642    #[tokio::test]
643    async fn should_prep() -> Result<()> {
644        const NAMED: &str = "SELECT :foo, :bar, :foo";
645        const POSITIONAL: &str = "SELECT ?, ?, ?";
646
647        let mut conn = Conn::new(get_opts()).await?;
648
649        let stmt_named = conn.prep(NAMED).await?;
650        let stmt_positional = conn.prep(POSITIONAL).await?;
651
652        let result_stmt_named: Option<(String, u8, String)> = conn
653            .exec_first(&stmt_named, params! { "foo" => "bar", "bar" => 42 })
654            .await?;
655        let result_str_named: Option<(String, u8, String)> = conn
656            .exec_first(NAMED, params! { "foo" => "bar", "bar" => 42 })
657            .await?;
658
659        let result_stmt_positional: Option<(String, u8, String)> = conn
660            .exec_first(&stmt_positional, ("bar", 42, "bar"))
661            .await?;
662        let result_str_positional: Option<(String, u8, String)> =
663            conn.exec_first(NAMED, ("bar", 42, "bar")).await?;
664
665        assert_eq!(
666            Some(("bar".to_owned(), 42_u8, "bar".to_owned())),
667            result_stmt_named
668        );
669        assert_eq!(result_stmt_named, result_str_named);
670        assert_eq!(result_str_named, result_stmt_positional);
671        assert_eq!(result_stmt_positional, result_str_positional);
672
673        conn.disconnect().await?;
674
675        Ok(())
676    }
677}