mysql_async/queryable/query_result/
mod.rs

1// Copyright (c) 2017 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use mysql_common::row::convert::FromRowError;
10
11use std::{borrow::Cow, fmt, marker::PhantomData, result::Result as StdResult, sync::Arc};
12
13use crate::{
14    conn::{routines::NextSetRoutine, PendingResult},
15    connection_like::Connection,
16    error::*,
17    prelude::{FromRow, Protocol},
18    Column, Row,
19};
20
21pub mod result_set_stream;
22mod tests;
23
24/// Result set metadata.
25#[derive(Debug, Clone, Eq, PartialEq)]
26pub enum ResultSetMeta {
27    /// Text result set, that may contain rows.
28    Text(Arc<[Column]>),
29    /// Binary result set, that may contain rows.
30    Binary(Arc<[Column]>),
31}
32
33impl ResultSetMeta {
34    fn columns(&self) -> &Arc<[Column]> {
35        match self {
36            ResultSetMeta::Text(cols) | ResultSetMeta::Binary(cols) => cols,
37        }
38    }
39}
40
41/// Result of a query or statement execution.
42///
43/// Represents an asynchronous query result, that may not be fully consumed.
44///
45/// # Note
46///
47/// Unconsumed query results are dropped implicitly when corresponding connection
48/// is dropped or queried. Also note, that in this case all remaining errors will be
49/// emitted to the caller:
50///
51/// ```rust
52/// # use mysql_async::test_misc::get_opts;
53/// # #[tokio::main]
54/// # async fn main() -> mysql_async::Result<()> {
55/// use mysql_async::*;
56/// use mysql_async::prelude::*;
57/// let mut conn = Conn::new(get_opts()).await?;
58///
59/// // second result set will contain an error,
60/// // but the first result set is ok, so this line will pass
61/// conn.query_iter("DO 1; BLABLA;").await?;
62/// // `QueryResult` was dropped withot being consumed
63///
64/// // driver must cleanup any unconsumed result to perform another query on `conn`,
65/// // so this operation will be performed implicitly, but the unconsumed result
66/// // contains an error claiming about 'BLABLA', so this error will be emitted here:
67/// assert!(conn.query_iter("DO 1").await.unwrap_err().to_string().contains("BLABLA"));
68///
69/// # conn.disconnect().await }
70/// ```
71pub struct QueryResult<'a, 't: 'a, P> {
72    conn: Connection<'a, 't>,
73    __phantom: PhantomData<P>,
74}
75
76impl<'a, 't: 'a, P> fmt::Debug for QueryResult<'a, 't, P> {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        f.debug_struct("QueryResult")
79            .field("conn", &self.conn)
80            .field("__phantom", &"PhantomData<P>")
81            .finish()
82    }
83}
84
85impl<'a, 't: 'a, P> QueryResult<'a, 't, P>
86where
87    P: Protocol,
88{
89    pub fn new<T: Into<Connection<'a, 't>>>(conn: T) -> Self {
90        QueryResult {
91            conn: conn.into(),
92            __phantom: PhantomData,
93        }
94    }
95
96    /// Returns `true` if this query result may contain rows.
97    ///
98    /// If `false` then no rows possible for this query tesult (e.g. result of an UPDATE query).
99    fn has_rows(&self) -> bool {
100        self.conn
101            .get_pending_result()
102            .map(|pending_result| match pending_result {
103                Some(PendingResult::Pending(meta)) => meta.columns().len() > 0,
104                Some(PendingResult::Taken(meta)) => meta.columns().len() > 0,
105                None => false,
106            })
107            .unwrap_or(false)
108    }
109
110    /// `true` if there are no more rows nor result sets in this query.
111    ///
112    /// This function will return `false` if the last result set was taken
113    /// by the [`QueryResult::stream`] that was dropped before being fully consumed
114    /// (i.e. caller will get `false` even if QueryResult data is reachable only for library internals).
115    pub fn is_empty(&self) -> bool {
116        !self.has_rows() && !self.conn.more_results_exists()
117    }
118
119    /// Low-level function that reads a result set row.
120    ///
121    /// Returns `None` if there are no more rows in the current set.
122    async fn next_row(&mut self, columns: Arc<[Column]>) -> crate::Result<Option<Row>> {
123        let mut row = None;
124
125        if columns.is_empty() {
126            // Empty, but not yet consumed result set.
127            self.conn.set_pending_result(None)?;
128        } else {
129            // Not yet consumed non-empty result set.
130            let packet = match self.conn.read_packet().await {
131                Ok(packet) => packet,
132                Err(err) => {
133                    // Next row contained an error. No more data will follow.
134                    self.conn.set_pending_result(None)?;
135                    return Err(err);
136                }
137            };
138
139            if P::is_last_result_set_packet(self.conn.capabilities(), &packet) {
140                // `packet` is a result set terminator.
141                self.conn.set_pending_result(None)?;
142            } else {
143                // `packet` is a result set row.
144                row = Some(P::read_result_set_row(&packet, columns)?);
145            }
146        }
147
148        Ok(row)
149    }
150
151    /// Low-level function that jumps to the next result set.
152    ///
153    /// Returns `false` if there are no more result sets.
154    async fn next_set(&mut self) -> crate::Result<bool> {
155        if self.conn.more_results_exists() {
156            // More data will follow.
157            self.conn.routine(NextSetRoutine::<P>::new()).await?;
158        }
159        Ok(self.conn.has_pending_result())
160    }
161
162    /// Low-level function that reads a next row and tries to jump
163    /// to the next result set if the current one is exhausted.
164    async fn next_row_or_next_set(&mut self, meta: ResultSetMeta) -> crate::Result<Option<Row>> {
165        let columns = meta.columns().clone();
166
167        self.next_row_or_next_set2(columns).await
168    }
169
170    /// Low-level function that reads a next row and tries to jump
171    /// to the next result set if the current one is exhausted.
172    async fn next_row_or_next_set2(
173        &mut self,
174        columns: Arc<[Column]>,
175    ) -> crate::Result<Option<Row>> {
176        if let Some(row) = self.next_row(columns).await? {
177            Ok(Some(row))
178        } else {
179            self.next_set().await?;
180            Ok(None)
181        }
182    }
183
184    /// Skips the taken result set.
185    async fn skip_taken(&mut self, meta: Arc<ResultSetMeta>) -> crate::Result<()> {
186        while (self.next_row_or_next_set((*meta).clone()).await?).is_some() {}
187        Ok(())
188    }
189
190    #[doc(hidden)]
191    pub async fn next(&mut self) -> Result<Option<Row>> {
192        loop {
193            match self.conn.use_pending_result()?.cloned() {
194                Some(PendingResult::Pending(meta)) => return self.next_row_or_next_set(meta).await,
195                Some(PendingResult::Taken(meta)) => self.skip_taken(meta).await?,
196                None => return Ok(None),
197            }
198        }
199    }
200
201    /// Last insert id, if any.
202    pub fn last_insert_id(&self) -> Option<u64> {
203        self.conn.last_insert_id()
204    }
205
206    /// Number of affected rows as reported by the server, or `0`.
207    pub fn affected_rows(&self) -> u64 {
208        self.conn.affected_rows()
209    }
210
211    /// Text information as reported by the server, or an empty string.
212    pub fn info(&self) -> Cow<'_, str> {
213        self.conn.info()
214    }
215
216    /// Number of warnings as reported by the server, or `0`.
217    pub fn warnings(&self) -> u16 {
218        self.conn.get_warnings()
219    }
220
221    /// Collects the current result set of this query result.
222    ///
223    /// It is parametrized by `R` and internally calls `R::from_row(Row)` on each row.
224    ///
225    /// It will collect rows up to a neares result set boundary. This means that you should call
226    /// `collect` as many times as result sets in your query result. For example query
227    /// `SELECT 'foo'; SELECT 'foo', 'bar';` will produce `QueryResult` with two result sets in it.
228    /// One can use `QueryResult::is_empty` to make sure that there is no more result sets.
229    ///
230    /// # Panic
231    ///
232    /// It'll panic if any row isn't convertible to `R` (i.e. programmer error or unknown schema).
233    /// * In case of programmer error see [`FromRow`] docs;
234    /// * In case of unknown schema use [`QueryResult::try_collect`].
235    pub async fn collect<R>(&mut self) -> Result<Vec<R>>
236    where
237        R: FromRow + Send + 'static,
238    {
239        self.reduce(Vec::new(), |mut acc, row| {
240            acc.push(FromRow::from_row(row));
241            acc
242        })
243        .await
244    }
245
246    /// Collects the current result set of this query result.
247    ///
248    /// It works the same way as [`QueryResult::collect`] but won't panic if row isn't convertible
249    /// to `R`.
250    pub async fn try_collect<R>(&mut self) -> Result<Vec<StdResult<R, FromRowError>>>
251    where
252        R: FromRow + Send + 'static,
253    {
254        self.reduce(Vec::new(), |mut acc, row| {
255            acc.push(FromRow::from_row_opt(row));
256            acc
257        })
258        .await
259    }
260
261    /// Collects the current result set of this query result and drops everything else.
262    ///
263    /// # Panic
264    ///
265    /// It'll panic if any row isn't convertible to `R` (i.e. programmer error or unknown schema).
266    /// * In case of programmer error see `FromRow` docs;
267    /// * In case of unknown schema use [`QueryResult::try_collect`].
268    pub async fn collect_and_drop<R>(mut self) -> Result<Vec<R>>
269    where
270        R: FromRow + Send + 'static,
271    {
272        let output = self.collect::<R>().await?;
273        self.drop_result().await?;
274        Ok(output)
275    }
276
277    /// Collects the current result set of this query result and drops everything else.
278    ///
279    /// It works the same way as [`QueryResult::collect_and_drop`] but won't panic if row isn't
280    /// convertible to `R`.
281    pub async fn try_collect_and_drop<R>(mut self) -> Result<Vec<StdResult<R, FromRowError>>>
282    where
283        R: FromRow + Send + 'static,
284    {
285        let output = self.try_collect().await?;
286        self.drop_result().await?;
287        Ok(output)
288    }
289
290    /// Executes `fun` on every row of the current result set.
291    ///
292    /// It will stop on the nearest result set boundary (see `QueryResult::collect` docs).
293    pub async fn for_each<F>(&mut self, mut fun: F) -> Result<()>
294    where
295        F: FnMut(Row),
296    {
297        if self.is_empty() {
298            Ok(())
299        } else {
300            while let Some(row) = self.next().await? {
301                fun(row);
302            }
303            Ok(())
304        }
305    }
306
307    /// Executes `fun` on every row of the current result set and drops everything else.
308    pub async fn for_each_and_drop<F>(mut self, fun: F) -> Result<()>
309    where
310        F: FnMut(Row),
311    {
312        self.for_each(fun).await?;
313        self.drop_result().await?;
314        Ok(())
315    }
316
317    /// Maps every row of the current result set to `U` using `fun`.
318    ///
319    /// It will stop on the nearest result set boundary (see `QueryResult::collect` docs).
320    pub async fn map<F, U>(&mut self, mut fun: F) -> Result<Vec<U>>
321    where
322        F: FnMut(Row) -> U,
323    {
324        let mut acc = Vec::new();
325        while let Some(row) = self.next().await? {
326            acc.push(fun(crate::from_row(row)));
327        }
328        Ok(acc)
329    }
330
331    /// Map every row of the current result set to `U` using `fun` and drops everything else.
332    pub async fn map_and_drop<F, U>(mut self, fun: F) -> Result<Vec<U>>
333    where
334        F: FnMut(Row) -> U,
335    {
336        let rows = self.map(fun).await?;
337        self.drop_result().await?;
338        Ok(rows)
339    }
340
341    /// Reduces rows of the current result set to `U` using `fun`.
342    ///
343    /// It will stop on the nearest result set boundary (see `QueryResult::collect` docs).
344    pub async fn reduce<T, F, U>(&mut self, mut init: U, mut fun: F) -> Result<U>
345    where
346        F: FnMut(U, T) -> U,
347        T: FromRow + Send + 'static,
348    {
349        while let Some(row) = self.next().await? {
350            init = fun(init, crate::from_row(row));
351        }
352        Ok(init)
353    }
354
355    /// Reduces rows of the current result set to `U` using `fun` and drops everything else.
356    pub async fn reduce_and_drop<T, F, U>(mut self, init: U, fun: F) -> Result<U>
357    where
358        F: FnMut(U, T) -> U,
359        T: FromRow + Send + 'static,
360    {
361        let acc = self.reduce(init, fun).await?;
362        self.drop_result().await?;
363        Ok(acc)
364    }
365
366    /// Drops this query result.
367    pub async fn drop_result(mut self) -> Result<()> {
368        loop {
369            while self.next().await?.is_some() {}
370            if !self.conn.has_pending_result() {
371                break Ok(());
372            }
373        }
374    }
375
376    /// Returns a reference to a columns list of this query result.
377    ///
378    /// Empty list means that this result set was never meant to contain rows.
379    pub fn columns_ref(&self) -> &[Column] {
380        self.conn
381            .get_pending_result()
382            .ok()
383            .flatten()
384            .map(|meta| match meta {
385                PendingResult::Pending(meta) => &meta.columns()[..],
386                PendingResult::Taken(meta) => &meta.columns()[..],
387            })
388            .unwrap_or_default()
389    }
390
391    /// Returns a copy of a columns list of this query result.
392    pub fn columns(&self) -> Option<Arc<[Column]>> {
393        self.conn
394            .get_pending_result()
395            .ok()
396            .flatten()
397            .map(|meta| match meta {
398                PendingResult::Pending(meta) => meta.columns(),
399                PendingResult::Taken(meta) => meta.columns(),
400            })
401            .cloned()
402    }
403}