mysql_async/queryable/query_result/mod.rs
1// Copyright (c) 2017 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use mysql_common::row::convert::FromRowError;
10
11use std::{borrow::Cow, fmt, marker::PhantomData, result::Result as StdResult, sync::Arc};
12
13use crate::{
14 conn::{routines::NextSetRoutine, PendingResult},
15 connection_like::Connection,
16 error::*,
17 prelude::{FromRow, Protocol},
18 Column, Row,
19};
20
21pub mod result_set_stream;
22mod tests;
23
24/// Result set metadata.
25#[derive(Debug, Clone, Eq, PartialEq)]
26pub enum ResultSetMeta {
27 /// Text result set, that may contain rows.
28 Text(Arc<[Column]>),
29 /// Binary result set, that may contain rows.
30 Binary(Arc<[Column]>),
31}
32
33impl ResultSetMeta {
34 fn columns(&self) -> &Arc<[Column]> {
35 match self {
36 ResultSetMeta::Text(cols) | ResultSetMeta::Binary(cols) => cols,
37 }
38 }
39}
40
41/// Result of a query or statement execution.
42///
43/// Represents an asynchronous query result, that may not be fully consumed.
44///
45/// # Note
46///
47/// Unconsumed query results are dropped implicitly when corresponding connection
48/// is dropped or queried. Also note, that in this case all remaining errors will be
49/// emitted to the caller:
50///
51/// ```rust
52/// # use mysql_async::test_misc::get_opts;
53/// # #[tokio::main]
54/// # async fn main() -> mysql_async::Result<()> {
55/// use mysql_async::*;
56/// use mysql_async::prelude::*;
57/// let mut conn = Conn::new(get_opts()).await?;
58///
59/// // second result set will contain an error,
60/// // but the first result set is ok, so this line will pass
61/// conn.query_iter("DO 1; BLABLA;").await?;
62/// // `QueryResult` was dropped withot being consumed
63///
64/// // driver must cleanup any unconsumed result to perform another query on `conn`,
65/// // so this operation will be performed implicitly, but the unconsumed result
66/// // contains an error claiming about 'BLABLA', so this error will be emitted here:
67/// assert!(conn.query_iter("DO 1").await.unwrap_err().to_string().contains("BLABLA"));
68///
69/// # conn.disconnect().await }
70/// ```
71pub struct QueryResult<'a, 't: 'a, P> {
72 conn: Connection<'a, 't>,
73 __phantom: PhantomData<P>,
74}
75
76impl<'a, 't: 'a, P> fmt::Debug for QueryResult<'a, 't, P> {
77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78 f.debug_struct("QueryResult")
79 .field("conn", &self.conn)
80 .field("__phantom", &"PhantomData<P>")
81 .finish()
82 }
83}
84
85impl<'a, 't: 'a, P> QueryResult<'a, 't, P>
86where
87 P: Protocol,
88{
89 pub fn new<T: Into<Connection<'a, 't>>>(conn: T) -> Self {
90 QueryResult {
91 conn: conn.into(),
92 __phantom: PhantomData,
93 }
94 }
95
96 /// Returns `true` if this query result may contain rows.
97 ///
98 /// If `false` then no rows possible for this query tesult (e.g. result of an UPDATE query).
99 fn has_rows(&self) -> bool {
100 self.conn
101 .get_pending_result()
102 .map(|pending_result| match pending_result {
103 Some(PendingResult::Pending(meta)) => meta.columns().len() > 0,
104 Some(PendingResult::Taken(meta)) => meta.columns().len() > 0,
105 None => false,
106 })
107 .unwrap_or(false)
108 }
109
110 /// `true` if there are no more rows nor result sets in this query.
111 ///
112 /// This function will return `false` if the last result set was taken
113 /// by the [`QueryResult::stream`] that was dropped before being fully consumed
114 /// (i.e. caller will get `false` even if QueryResult data is reachable only for library internals).
115 pub fn is_empty(&self) -> bool {
116 !self.has_rows() && !self.conn.more_results_exists()
117 }
118
119 /// Low-level function that reads a result set row.
120 ///
121 /// Returns `None` if there are no more rows in the current set.
122 async fn next_row(&mut self, columns: Arc<[Column]>) -> crate::Result<Option<Row>> {
123 let mut row = None;
124
125 if columns.is_empty() {
126 // Empty, but not yet consumed result set.
127 self.conn.as_mut().set_pending_result(None)?;
128 } else {
129 // Not yet consumed non-empty result set.
130 let packet = match self.conn.as_mut().read_packet().await {
131 Ok(packet) => packet,
132 Err(err) => {
133 // Next row contained an error. No more data will follow.
134 self.conn.as_mut().set_pending_result(None)?;
135 return Err(err);
136 }
137 };
138
139 if P::is_last_result_set_packet(self.conn.capabilities(), &packet) {
140 // `packet` is a result set terminator.
141 self.conn.as_mut().set_pending_result(None)?;
142 } else {
143 // `packet` is a result set row.
144 row = Some(P::read_result_set_row(&packet, columns)?);
145 }
146 }
147
148 Ok(row)
149 }
150
151 /// Low-level function that jumps to the next result set.
152 ///
153 /// Returns `false` if there are no more result sets.
154 async fn next_set(&mut self) -> crate::Result<bool> {
155 if self.conn.more_results_exists() {
156 // More data will follow.
157 self.conn
158 .as_mut()
159 .routine(NextSetRoutine::<P>::new())
160 .await?;
161 }
162 Ok(self.conn.has_pending_result())
163 }
164
165 /// Low-level function that reads a next row and tries to jump
166 /// to the next result set if the current one is exhausted.
167 async fn next_row_or_next_set(&mut self, meta: ResultSetMeta) -> crate::Result<Option<Row>> {
168 let columns = meta.columns().clone();
169
170 self.next_row_or_next_set2(columns).await
171 }
172
173 /// Low-level function that reads a next row and tries to jump
174 /// to the next result set if the current one is exhausted.
175 async fn next_row_or_next_set2(
176 &mut self,
177 columns: Arc<[Column]>,
178 ) -> crate::Result<Option<Row>> {
179 if let Some(row) = self.next_row(columns).await? {
180 Ok(Some(row))
181 } else {
182 self.next_set().await?;
183 Ok(None)
184 }
185 }
186
187 /// Skips the taken result set.
188 async fn skip_taken(&mut self, meta: Arc<ResultSetMeta>) -> crate::Result<()> {
189 while (self.next_row_or_next_set((*meta).clone()).await?).is_some() {}
190 Ok(())
191 }
192
193 #[doc(hidden)]
194 pub async fn next(&mut self) -> Result<Option<Row>> {
195 loop {
196 match self.conn.as_mut().use_pending_result()?.cloned() {
197 Some(PendingResult::Pending(meta)) => return self.next_row_or_next_set(meta).await,
198 Some(PendingResult::Taken(meta)) => self.skip_taken(meta).await?,
199 None => return Ok(None),
200 }
201 }
202 }
203
204 /// Last insert id, if any.
205 pub fn last_insert_id(&self) -> Option<u64> {
206 self.conn.last_insert_id()
207 }
208
209 /// Number of affected rows as reported by the server, or `0`.
210 pub fn affected_rows(&self) -> u64 {
211 self.conn.affected_rows()
212 }
213
214 /// Text information as reported by the server, or an empty string.
215 pub fn info(&self) -> Cow<'_, str> {
216 self.conn.info()
217 }
218
219 /// Number of warnings as reported by the server, or `0`.
220 pub fn warnings(&self) -> u16 {
221 self.conn.get_warnings()
222 }
223
224 /// Collects the current result set of this query result.
225 ///
226 /// It is parametrized by `R` and internally calls `R::from_row(Row)` on each row.
227 ///
228 /// It will collect rows up to a neares result set boundary. This means that you should call
229 /// `collect` as many times as result sets in your query result. For example query
230 /// `SELECT 'foo'; SELECT 'foo', 'bar';` will produce `QueryResult` with two result sets in it.
231 /// One can use `QueryResult::is_empty` to make sure that there is no more result sets.
232 ///
233 /// # Panic
234 ///
235 /// It'll panic if any row isn't convertible to `R` (i.e. programmer error or unknown schema).
236 /// * In case of programmer error see [`FromRow`] docs;
237 /// * In case of unknown schema use [`QueryResult::try_collect`].
238 pub async fn collect<R>(&mut self) -> Result<Vec<R>>
239 where
240 R: FromRow + Send + 'static,
241 {
242 self.reduce(Vec::new(), |mut acc, row| {
243 acc.push(FromRow::from_row(row));
244 acc
245 })
246 .await
247 }
248
249 /// Collects the current result set of this query result.
250 ///
251 /// It works the same way as [`QueryResult::collect`] but won't panic if row isn't convertible
252 /// to `R`.
253 pub async fn try_collect<R>(&mut self) -> Result<Vec<StdResult<R, FromRowError>>>
254 where
255 R: FromRow + Send + 'static,
256 {
257 self.reduce(Vec::new(), |mut acc, row| {
258 acc.push(FromRow::from_row_opt(row));
259 acc
260 })
261 .await
262 }
263
264 /// Collects the current result set of this query result and drops everything else.
265 ///
266 /// # Panic
267 ///
268 /// It'll panic if any row isn't convertible to `R` (i.e. programmer error or unknown schema).
269 /// * In case of programmer error see `FromRow` docs;
270 /// * In case of unknown schema use [`QueryResult::try_collect`].
271 pub async fn collect_and_drop<R>(mut self) -> Result<Vec<R>>
272 where
273 R: FromRow + Send + 'static,
274 {
275 let output = self.collect::<R>().await?;
276 self.drop_result().await?;
277 Ok(output)
278 }
279
280 /// Collects the current result set of this query result and drops everything else.
281 ///
282 /// It works the same way as [`QueryResult::collect_and_drop`] but won't panic if row isn't
283 /// convertible to `R`.
284 pub async fn try_collect_and_drop<R>(mut self) -> Result<Vec<StdResult<R, FromRowError>>>
285 where
286 R: FromRow + Send + 'static,
287 {
288 let output = self.try_collect().await?;
289 self.drop_result().await?;
290 Ok(output)
291 }
292
293 /// Executes `fun` on every row of the current result set.
294 ///
295 /// It will stop on the nearest result set boundary (see `QueryResult::collect` docs).
296 pub async fn for_each<F>(&mut self, mut fun: F) -> Result<()>
297 where
298 F: FnMut(Row),
299 {
300 if self.is_empty() {
301 Ok(())
302 } else {
303 while let Some(row) = self.next().await? {
304 fun(row);
305 }
306 Ok(())
307 }
308 }
309
310 /// Executes `fun` on every row of the current result set and drops everything else.
311 pub async fn for_each_and_drop<F>(mut self, fun: F) -> Result<()>
312 where
313 F: FnMut(Row),
314 {
315 self.for_each(fun).await?;
316 self.drop_result().await?;
317 Ok(())
318 }
319
320 /// Maps every row of the current result set to `U` using `fun`.
321 ///
322 /// It will stop on the nearest result set boundary (see `QueryResult::collect` docs).
323 pub async fn map<F, U>(&mut self, mut fun: F) -> Result<Vec<U>>
324 where
325 F: FnMut(Row) -> U,
326 {
327 let mut acc = Vec::new();
328 while let Some(row) = self.next().await? {
329 acc.push(fun(crate::from_row(row)));
330 }
331 Ok(acc)
332 }
333
334 /// Map every row of the current result set to `U` using `fun` and drops everything else.
335 pub async fn map_and_drop<F, U>(mut self, fun: F) -> Result<Vec<U>>
336 where
337 F: FnMut(Row) -> U,
338 {
339 let rows = self.map(fun).await?;
340 self.drop_result().await?;
341 Ok(rows)
342 }
343
344 /// Reduces rows of the current result set to `U` using `fun`.
345 ///
346 /// It will stop on the nearest result set boundary (see `QueryResult::collect` docs).
347 pub async fn reduce<T, F, U>(&mut self, mut init: U, mut fun: F) -> Result<U>
348 where
349 F: FnMut(U, T) -> U,
350 T: FromRow + Send + 'static,
351 {
352 while let Some(row) = self.next().await? {
353 init = fun(init, crate::from_row(row));
354 }
355 Ok(init)
356 }
357
358 /// Reduces rows of the current result set to `U` using `fun` and drops everything else.
359 pub async fn reduce_and_drop<T, F, U>(mut self, init: U, fun: F) -> Result<U>
360 where
361 F: FnMut(U, T) -> U,
362 T: FromRow + Send + 'static,
363 {
364 let acc = self.reduce(init, fun).await?;
365 self.drop_result().await?;
366 Ok(acc)
367 }
368
369 /// Drops this query result.
370 pub async fn drop_result(mut self) -> Result<()> {
371 loop {
372 while self.next().await?.is_some() {}
373 if !self.conn.has_pending_result() {
374 break Ok(());
375 }
376 }
377 }
378
379 /// Returns a reference to a columns list of this query result.
380 ///
381 /// Empty list means that this result set was never meant to contain rows.
382 pub fn columns_ref(&self) -> &[Column] {
383 self.conn
384 .get_pending_result()
385 .ok()
386 .flatten()
387 .map(|meta| match meta {
388 PendingResult::Pending(meta) => &meta.columns()[..],
389 PendingResult::Taken(meta) => &meta.columns()[..],
390 })
391 .unwrap_or_default()
392 }
393
394 /// Returns a copy of a columns list of this query result.
395 pub fn columns(&self) -> Option<Arc<[Column]>> {
396 self.conn
397 .get_pending_result()
398 .ok()
399 .flatten()
400 .map(|meta| match meta {
401 PendingResult::Pending(meta) => meta.columns(),
402 PendingResult::Taken(meta) => meta.columns(),
403 })
404 .cloned()
405 }
406}