diff --git a/Dapper.StrongName/Dapper.StrongName.csproj b/Dapper.StrongName/Dapper.StrongName.csproj index 023a4eae4..1ed89eecd 100644 --- a/Dapper.StrongName/Dapper.StrongName.csproj +++ b/Dapper.StrongName/Dapper.StrongName.csproj @@ -19,6 +19,7 @@ true + diff --git a/Dapper/Dapper.csproj b/Dapper/Dapper.csproj index 25f87ccf6..d7031a1bd 100644 --- a/Dapper/Dapper.csproj +++ b/Dapper/Dapper.csproj @@ -16,6 +16,7 @@ true + diff --git a/Dapper/SqlMapper.AsyncStream.cs b/Dapper/SqlMapper.AsyncStream.cs new file mode 100644 index 000000000..571bd02d7 --- /dev/null +++ b/Dapper/SqlMapper.AsyncStream.cs @@ -0,0 +1,645 @@ +#if NET5_0 || NETSTANDARD2_0 +using System; +using System.Collections.Generic; +using System.Data; +using System.Data.Common; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace Dapper +{ + public static partial class SqlMapper + { + /// + /// Execute a query using an asynchronous stream. + /// + /// The connection to query on. + /// The SQL to execute for the query. + /// The parameters to pass, if any. + /// The transaction to use, if any. + /// The command timeout (in seconds). + /// The type of command to execute. + /// The cancellation token for this query. + /// Note: each row can be accessed via "dynamic", or by casting to an IDictionary<string,object> + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null, CancellationToken cancellationToken = default) => + StreamAsync(cnn, typeof(DapperRow), new CommandDefinition(sql, param, transaction, commandTimeout, commandType, CommandFlags.None, cancellationToken)); + + /// + /// Execute a query using an asynchronous stream. + /// + /// The connection to query on. + /// The command used to query on this connection. + /// Note: each row can be accessed via "dynamic", or by casting to an IDictionary<string,object> + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, CommandDefinition command) => + StreamAsync(cnn, typeof(DapperRow), command); + + /// + /// Execute a query using an asynchronous stream. + /// + /// The type of results to return. + /// The connection to query on. + /// The SQL to execute for the query. + /// The parameters to pass, if any. + /// The transaction to use, if any. + /// The command timeout (in seconds). + /// The type of command to execute. + /// The cancellation token for this query. + /// + /// A sequence of data of ; if a basic type (int, string, etc) is queried then the data from the first column in assumed, otherwise an instance is + /// created per row, and a direct column-name===member-name mapping is assumed (case insensitive). + /// + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null, CancellationToken cancellationToken = default) => + StreamAsync(cnn, typeof(T), new CommandDefinition(sql, param, transaction, commandTimeout, commandType, CommandFlags.None, cancellationToken)); + + /// + /// Execute a query using an asynchronous stream. + /// + /// The connection to query on. + /// The type to return. + /// The SQL to execute for the query. + /// The parameters to pass, if any. + /// The transaction to use, if any. + /// The command timeout (in seconds). + /// The type of command to execute. + /// The cancellation token for this query. + /// is null. + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, Type type, string sql, object param = null, IDbTransaction transaction = null, int? commandTimeout = null, CommandType? commandType = null, CancellationToken cancellationToken = default) + { + if (type == null) throw new ArgumentNullException(nameof(type)); + return StreamAsync(cnn, type, new CommandDefinition(sql, param, transaction, commandTimeout, commandType, CommandFlags.None, cancellationToken)); + } + + /// + /// Execute a query using an asynchronous stream. + /// + /// The type to return. + /// The connection to query on. + /// The command used to query on this connection. + /// + /// A sequence of data of ; if a basic type (int, string, etc) is queried then the data from the first column in assumed, otherwise an instance is + /// created per row, and a direct column-name===member-name mapping is assumed (case insensitive). + /// + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, CommandDefinition command) => + StreamAsync(cnn, typeof(T), command); + + /// + /// Execute a query using an asynchronous stream. + /// + /// The connection to query on. + /// The type to return. + /// The command used to query on this connection. + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, Type type, CommandDefinition command) => + StreamAsync(cnn, type, command); + + private static async IAsyncEnumerable StreamAsync(this IDbConnection cnn, Type effectiveType, CommandDefinition command) + { + object param = command.Parameters; + var identity = new Identity(command.CommandText, command.CommandType, cnn, effectiveType, param?.GetType()); + var info = GetCacheInfo(identity, param, command.AddToCache); + bool wasClosed = cnn.State == ConnectionState.Closed; + var cancel = command.CancellationToken; +#if NET5_0 + await +#endif + using var cmd = command.TrySetupAsyncCommand(cnn, info.ParamReader); + DbDataReader reader = null; + try + { + if (wasClosed) await cnn.TryOpenAsync(cancel).ConfigureAwait(false); + reader = await ExecuteReaderWithFlagsFallbackAsync(cmd, wasClosed, CommandBehavior.SequentialAccess | CommandBehavior.SingleResult, cancel).ConfigureAwait(false); + + var tuple = info.Deserializer; + int hash = GetColumnHash(reader); + if (tuple.Func == null || tuple.Hash != hash) + { + if (reader.FieldCount == 0) + yield break; + tuple = info.Deserializer = new DeserializerState(hash, GetDeserializer(effectiveType, reader, 0, -1, false)); + if (command.AddToCache) SetQueryCache(identity, info); + } + + var func = tuple.Func; + + var buffer = command.Buffered ? new List() : null; + while (await reader.ReadAsync(cancel).ConfigureAwait(false)) + { + var val = GetValue(reader, effectiveType, func(reader)); + + if (buffer == null) yield return val; + else buffer.Add(val); + } + while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ } + command.OnCompleted(); + + if (buffer == null) yield break; + foreach (var result in buffer) yield return result; + } + finally + { +#if NET5_0 + await +#endif + using (reader) { /* dispose if non-null */ } + if (wasClosed) cnn.Close(); + } + } + + /// + /// Perform an asynchronous multi-mapping query with 2 input types. + /// This returns a single type, combined from the raw types via . + /// + /// The first type in the recordset. + /// The second type in the recordset. + /// The combined type to return. + /// The connection to query on. + /// The SQL to execute for this query. + /// The function to map row types to the return type. + /// The parameters to use for this query. + /// The transaction to use for this query. + /// Whether to buffer the results in memory. + /// The field we should split and read the second object from (default: "Id"). + /// Number of seconds before command execution timeout. + /// Is it a stored proc or a batch? + /// The cancellation token for this query. + /// An asynchronous stream of . + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, string sql, Func map, object param = null, IDbTransaction transaction = null, bool buffered = true, string splitOn = "Id", int? commandTimeout = null, CommandType? commandType = null, CancellationToken cancellationToken = default) => + MultiMapStreamAsync(cnn, + new CommandDefinition(sql, param, transaction, commandTimeout, commandType, buffered ? CommandFlags.Buffered : CommandFlags.None, cancellationToken), map, splitOn); + + /// + /// Perform an asynchronous multi-mapping query with 2 input types. + /// This returns a single type, combined from the raw types via . + /// + /// The first type in the recordset. + /// The second type in the recordset. + /// The combined type to return. + /// The connection to query on. + /// The field we should split and read the second object from (default: "Id"). + /// The command to execute. + /// The function to map row types to the return type. + /// An asynchronous stream of . + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, CommandDefinition command, Func map, string splitOn = "Id") => + MultiMapStreamAsync(cnn, command, map, splitOn); + + /// + /// Perform an asynchronous multi-mapping query with 3 input types. + /// This returns a single type, combined from the raw types via . + /// + /// The first type in the recordset. + /// The second type in the recordset. + /// The third type in the recordset. + /// The combined type to return. + /// The connection to query on. + /// The SQL to execute for this query. + /// The function to map row types to the return type. + /// The parameters to use for this query. + /// The transaction to use for this query. + /// Whether to buffer the results in memory. + /// The field we should split and read the second object from (default: "Id"). + /// Number of seconds before command execution timeout. + /// Is it a stored proc or a batch? + /// The cancellation token for this query. + /// An asynchronous stream of . + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, string sql, Func map, object param = null, IDbTransaction transaction = null, bool buffered = true, string splitOn = "Id", int? commandTimeout = null, CommandType? commandType = null, CancellationToken cancellationToken = default) => + MultiMapStreamAsync(cnn, + new CommandDefinition(sql, param, transaction, commandTimeout, commandType, buffered ? CommandFlags.Buffered : CommandFlags.None, cancellationToken), map, splitOn); + + /// + /// Perform an asynchronous multi-mapping query with 3 input types. + /// This returns a single type, combined from the raw types via . + /// + /// The first type in the recordset. + /// The second type in the recordset. + /// The third type in the recordset. + /// The combined type to return. + /// The connection to query on. + /// The field we should split and read the second object from (default: "Id"). + /// The command to execute. + /// The function to map row types to the return type. + /// An asynchronous stream of . + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, CommandDefinition command, Func map, string splitOn = "Id") => + MultiMapStreamAsync(cnn, command, map, splitOn); + + /// + /// Perform an asynchronous multi-mapping query with 4 input types. + /// This returns a single type, combined from the raw types via . + /// + /// The first type in the recordset. + /// The second type in the recordset. + /// The third type in the recordset. + /// The fourth type in the recordset. + /// The combined type to return. + /// The connection to query on. + /// The SQL to execute for this query. + /// The function to map row types to the return type. + /// The parameters to use for this query. + /// The transaction to use for this query. + /// Whether to buffer the results in memory. + /// The field we should split and read the second object from (default: "Id"). + /// Number of seconds before command execution timeout. + /// Is it a stored proc or a batch? + /// The cancellation token for this query. + /// An asynchronous stream of . + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, string sql, Func map, object param = null, IDbTransaction transaction = null, bool buffered = true, string splitOn = "Id", int? commandTimeout = null, CommandType? commandType = null, CancellationToken cancellationToken = default) => + MultiMapStreamAsync(cnn, + new CommandDefinition(sql, param, transaction, commandTimeout, commandType, buffered ? CommandFlags.Buffered : CommandFlags.None, cancellationToken), map, splitOn); + + /// + /// Perform an asynchronous multi-mapping query with 4 input types. + /// This returns a single type, combined from the raw types via . + /// + /// The first type in the recordset. + /// The second type in the recordset. + /// The third type in the recordset. + /// The fourth type in the recordset. + /// The combined type to return. + /// The connection to query on. + /// The field we should split and read the second object from (default: "Id"). + /// The command to execute. + /// The function to map row types to the return type. + /// An asynchronous stream of . + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, CommandDefinition command, Func map, string splitOn = "Id") => + MultiMapStreamAsync(cnn, command, map, splitOn); + + /// + /// Perform an asynchronous multi-mapping query with 5 input types. + /// This returns a single type, combined from the raw types via . + /// + /// The first type in the recordset. + /// The second type in the recordset. + /// The third type in the recordset. + /// The fourth type in the recordset. + /// The fifth type in the recordset. + /// The combined type to return. + /// The connection to query on. + /// The SQL to execute for this query. + /// The function to map row types to the return type. + /// The parameters to use for this query. + /// The transaction to use for this query. + /// Whether to buffer the results in memory. + /// The field we should split and read the second object from (default: "Id"). + /// Number of seconds before command execution timeout. + /// Is it a stored proc or a batch? + /// The cancellation token for this query. + /// An asynchronous stream of . + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, string sql, Func map, object param = null, IDbTransaction transaction = null, bool buffered = true, string splitOn = "Id", int? commandTimeout = null, CommandType? commandType = null, CancellationToken cancellationToken = default) => + MultiMapStreamAsync(cnn, + new CommandDefinition(sql, param, transaction, commandTimeout, commandType, buffered ? CommandFlags.Buffered : CommandFlags.None, cancellationToken), map, splitOn); + + /// + /// Perform an asynchronous multi-mapping query with 5 input types. + /// This returns a single type, combined from the raw types via . + /// + /// The first type in the recordset. + /// The second type in the recordset. + /// The third type in the recordset. + /// The fourth type in the recordset. + /// The fifth type in the recordset. + /// The combined type to return. + /// The connection to query on. + /// The field we should split and read the second object from (default: "Id"). + /// The command to execute. + /// The function to map row types to the return type. + /// An asynchronous stream of . + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, CommandDefinition command, Func map, string splitOn = "Id") => + MultiMapStreamAsync(cnn, command, map, splitOn); + + /// + /// Perform an asynchronous multi-mapping query with 6 input types. + /// This returns a single type, combined from the raw types via . + /// + /// The first type in the recordset. + /// The second type in the recordset. + /// The third type in the recordset. + /// The fourth type in the recordset. + /// The fifth type in the recordset. + /// The sixth type in the recordset. + /// The combined type to return. + /// The connection to query on. + /// The SQL to execute for this query. + /// The function to map row types to the return type. + /// The parameters to use for this query. + /// The transaction to use for this query. + /// Whether to buffer the results in memory. + /// The field we should split and read the second object from (default: "Id"). + /// Number of seconds before command execution timeout. + /// Is it a stored proc or a batch? + /// The cancellation token for this query. + /// An asynchronous stream of . + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, string sql, Func map, object param = null, IDbTransaction transaction = null, bool buffered = true, string splitOn = "Id", int? commandTimeout = null, CommandType? commandType = null, CancellationToken cancellationToken = default) => + MultiMapStreamAsync(cnn, + new CommandDefinition(sql, param, transaction, commandTimeout, commandType, buffered ? CommandFlags.Buffered : CommandFlags.None, cancellationToken), map, splitOn); + + /// + /// Perform an asynchronous multi-mapping query with 6 input types. + /// This returns a single type, combined from the raw types via . + /// + /// The first type in the recordset. + /// The second type in the recordset. + /// The third type in the recordset. + /// The fourth type in the recordset. + /// The fifth type in the recordset. + /// The sixth type in the recordset. + /// The combined type to return. + /// The connection to query on. + /// The field we should split and read the second object from (default: "Id"). + /// The command to execute. + /// The function to map row types to the return type. + /// An asynchronous stream of . + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, CommandDefinition command, Func map, string splitOn = "Id") => + MultiMapStreamAsync(cnn, command, map, splitOn); + + /// + /// Perform an asynchronous multi-mapping query with 7 input types. + /// This returns a single type, combined from the raw types via . + /// + /// The first type in the recordset. + /// The second type in the recordset. + /// The third type in the recordset. + /// The fourth type in the recordset. + /// The fifth type in the recordset. + /// The sixth type in the recordset. + /// The seventh type in the recordset. + /// The combined type to return. + /// The connection to query on. + /// The SQL to execute for this query. + /// The function to map row types to the return type. + /// The parameters to use for this query. + /// The transaction to use for this query. + /// Whether to buffer the results in memory. + /// The field we should split and read the second object from (default: "Id"). + /// Number of seconds before command execution timeout. + /// Is it a stored proc or a batch? + /// The cancellation token for this query. + /// An asynchronous stream of . + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, string sql, Func map, object param = null, IDbTransaction transaction = null, bool buffered = true, string splitOn = "Id", int? commandTimeout = null, CommandType? commandType = null, CancellationToken cancellationToken = default) => + MultiMapStreamAsync(cnn, + new CommandDefinition(sql, param, transaction, commandTimeout, commandType, buffered ? CommandFlags.Buffered : CommandFlags.None, cancellationToken), map, splitOn); + + /// + /// Perform an asynchronous multi-mapping query with 7 input types. + /// This returns a single type, combined from the raw types via . + /// + /// The first type in the recordset. + /// The second type in the recordset. + /// The third type in the recordset. + /// The fourth type in the recordset. + /// The fifth type in the recordset. + /// The sixth type in the recordset. + /// The seventh type in the recordset. + /// The combined type to return. + /// The connection to query on. + /// The field we should split and read the second object from (default: "Id"). + /// The command to execute. + /// The function to map row types to the return type. + /// An asynchronous stream of . + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, CommandDefinition command, Func map, string splitOn = "Id") => + MultiMapStreamAsync(cnn, command, map, splitOn); + + private static async IAsyncEnumerable MultiMapStreamAsync(this IDbConnection cnn, CommandDefinition command, Delegate map, string splitOn) + { + object param = command.Parameters; + var identity = new Identity(command.CommandText, command.CommandType, cnn, typeof(TFirst), param?.GetType()); + var info = GetCacheInfo(identity, param, command.AddToCache); + bool wasClosed = cnn.State == ConnectionState.Closed; + CancellationToken cancel = command.CancellationToken; + try + { + if (wasClosed) await cnn.TryOpenAsync(command.CancellationToken).ConfigureAwait(false); +#if NET5_0 + await +#endif + using var cmd = command.TrySetupAsyncCommand(cnn, info.ParamReader); +#if NET5_0 + await +#endif + using var reader = await ExecuteReaderWithFlagsFallbackAsync(cmd, wasClosed, CommandBehavior.SequentialAccess | CommandBehavior.SingleResult, cancel).ConfigureAwait(false); + if (!command.Buffered) wasClosed = false; // handing back open reader; rely on command-behavior + var results = MultiMapStreamImpl(null, CommandDefinition.ForCallback(command.Parameters), map, splitOn, reader, identity, true); + + var buffer = command.Buffered ? new List() : null; + + await foreach (var result in results.WithCancellation(cancel).ConfigureAwait(false)) + { + if (buffer != null) buffer.Add(result); + else yield return result; + } + + if (buffer == null) yield break; + foreach (var result in buffer) yield return result; + } + finally + { + if (wasClosed) cnn.Close(); + } + } + + /// + /// Perform an asynchronous multi-mapping query with an arbitrary number of input types. + /// This returns a single type, combined from the raw types via . + /// + /// The combined type to return. + /// The connection to query on. + /// The SQL to execute for this query. + /// Array of types in the recordset. + /// The function to map row types to the return type. + /// The parameters to use for this query. + /// The transaction to use for this query. + /// Whether to buffer the results in memory. + /// The field we should split and read the second object from (default: "Id"). + /// Number of seconds before command execution timeout. + /// Is it a stored proc or a batch? + /// The cancellation token for this query. + /// An asynchronous stream of . + public static IAsyncEnumerable StreamAsync(this IDbConnection cnn, string sql, Type[] types, Func map, object param = null, IDbTransaction transaction = null, bool buffered = true, string splitOn = "Id", int? commandTimeout = null, CommandType? commandType = null, CancellationToken cancellationToken = default) + { + var command = new CommandDefinition(sql, param, transaction, commandTimeout, commandType, buffered ? CommandFlags.Buffered : CommandFlags.None, cancellationToken); + return MultiMapStreamAsync(cnn, command, types, map, splitOn); + } + + private static async IAsyncEnumerable MultiMapStreamAsync(this IDbConnection cnn, CommandDefinition command, Type[] types, Func map, string splitOn) + { + if (types.Length < 1) + { + throw new ArgumentException("you must provide at least one type to deserialize"); + } + + object param = command.Parameters; + var identity = new IdentityWithTypes(command.CommandText, command.CommandType, cnn, types[0], param?.GetType(), types); + var info = GetCacheInfo(identity, param, command.AddToCache); + bool wasClosed = cnn.State == ConnectionState.Closed; + CancellationToken cancel = command.CancellationToken; + try + { + if (wasClosed) await cnn.TryOpenAsync(cancel).ConfigureAwait(false); +#if NET5_0 + await +#endif + using var cmd = command.TrySetupAsyncCommand(cnn, info.ParamReader); +#if NET5_0 + await +#endif + using var reader = await ExecuteReaderWithFlagsFallbackAsync(cmd, wasClosed, CommandBehavior.SequentialAccess | CommandBehavior.SingleResult, cancel).ConfigureAwait(false); + var results = MultiMapAsyncImpl(null, default, types, map, splitOn, reader, identity, true); + + var buffer = command.Buffered ? new List() : null; + + await foreach (var result in results.WithCancellation(cancel).ConfigureAwait(false)) + { + if (buffer != null) buffer.Add(result); + else yield return result; + } + + if (buffer == null) yield break; + foreach (var result in buffer) yield return result; + } + finally + { + if (wasClosed) cnn.Close(); + } + } + + private static async IAsyncEnumerable MultiMapStreamImpl(this IDbConnection cnn, CommandDefinition command, Delegate map, string splitOn, DbDataReader reader, Identity identity, bool finalize) + { + object param = command.Parameters; + identity ??= new Identity(command.CommandText, command.CommandType, cnn, typeof(TFirst), param?.GetType()); + CacheInfo cinfo = GetCacheInfo(identity, param, command.AddToCache); + CancellationToken cancel = command.CancellationToken; + + DbCommand ownedCommand = null; + DbDataReader ownedReader = null; + + bool wasClosed = cnn?.State == ConnectionState.Closed; + try + { + if (reader == null) + { + ownedCommand = command.TrySetupAsyncCommand(cnn, cinfo.ParamReader); + if (wasClosed) cnn.Open(); + ownedReader = await ExecuteReaderWithFlagsFallbackAsync(ownedCommand, wasClosed, CommandBehavior.SequentialAccess | CommandBehavior.SingleResult, cancel).ConfigureAwait(false); + reader = ownedReader; + } + var deserializer = default(DeserializerState); + Func[] otherDeserializers; + + int hash = GetColumnHash(reader); + if ((deserializer = cinfo.Deserializer).Func == null || (otherDeserializers = cinfo.OtherDeserializers) == null || hash != deserializer.Hash) + { + var deserializers = GenerateDeserializers(identity, splitOn, reader); + deserializer = cinfo.Deserializer = new DeserializerState(hash, deserializers[0]); + otherDeserializers = cinfo.OtherDeserializers = deserializers.Skip(1).ToArray(); + if (command.AddToCache) SetQueryCache(identity, cinfo); + } + + Func mapIt = GenerateMapper(deserializer.Func, otherDeserializers, map); + + if (mapIt != null) + { + while (await reader.ReadAsync(cancel).ConfigureAwait(false)) + { + yield return mapIt(reader); + } + if (finalize) + { + while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore remaining result sets */ } + command.OnCompleted(); + } + } + } + finally + { + try + { +#if NET5_0 + await +#endif + using (ownedReader) { /* dispose if non-null */ } + } + finally + { + ownedCommand?.Parameters.Clear(); +#if NET5_0 + await +#endif + using (ownedCommand) { /* dispose if non-null */ } + if (wasClosed) cnn.Close(); + } + } + } + + private static async IAsyncEnumerable MultiMapAsyncImpl(this IDbConnection cnn, CommandDefinition command, Type[] types, Func map, string splitOn, DbDataReader reader, Identity identity, bool finalize) + { + if (types.Length < 1) + { + throw new ArgumentException("you must provide at least one type to deserialize"); + } + + object param = command.Parameters; + identity ??= new IdentityWithTypes(command.CommandText, command.CommandType, cnn, types[0], param?.GetType(), types); + CacheInfo cinfo = GetCacheInfo(identity, param, command.AddToCache); + CancellationToken cancel = command.CancellationToken; + + DbCommand ownedCommand = null; + DbDataReader ownedReader = null; + + bool wasClosed = cnn?.State == ConnectionState.Closed; + try + { + if (reader == null) + { + ownedCommand = command.TrySetupAsyncCommand(cnn, cinfo.ParamReader); + if (wasClosed) cnn.Open(); + ownedReader = await ExecuteReaderWithFlagsFallbackAsync(ownedCommand, wasClosed, CommandBehavior.SequentialAccess | CommandBehavior.SingleResult, cancel).ConfigureAwait(false); + reader = ownedReader; + } + DeserializerState deserializer; + Func[] otherDeserializers; + + int hash = GetColumnHash(reader); + if ((deserializer = cinfo.Deserializer).Func == null || (otherDeserializers = cinfo.OtherDeserializers) == null || hash != deserializer.Hash) + { + var deserializers = GenerateDeserializers(identity, splitOn, reader); + deserializer = cinfo.Deserializer = new DeserializerState(hash, deserializers[0]); + otherDeserializers = cinfo.OtherDeserializers = deserializers.Skip(1).ToArray(); + SetQueryCache(identity, cinfo); + } + + Func mapIt = GenerateMapper(types.Length, deserializer.Func, otherDeserializers, map); + + if (mapIt != null) + { + while (await reader.ReadAsync(cancel).ConfigureAwait(false)) + { + yield return mapIt(reader); + } + if (finalize) + { + while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore remaining result sets */ } + command.OnCompleted(); + } + } + } + finally + { + try + { +#if NET5_0 + await +#endif + using (ownedReader) { /* dispose if non-null */ } + } + finally + { + ownedCommand?.Parameters.Clear(); +#if NET5_0 + await +#endif + using (ownedCommand) { /* dispose if non-null */ } + if (wasClosed) cnn.Close(); + } + } + } + } +} +#endif // NET5_0 || NETSTANDARD2_0 diff --git a/Dapper/SqlMapper.GridReader.AsyncStream.cs b/Dapper/SqlMapper.GridReader.AsyncStream.cs new file mode 100644 index 000000000..59c43b5c3 --- /dev/null +++ b/Dapper/SqlMapper.GridReader.AsyncStream.cs @@ -0,0 +1,100 @@ +#if NET5_0 || NETSTANDARD2_0 +using System; +using System.Collections.Generic; +using System.Data; +using System.Data.Common; +using System.Linq; +using System.Threading.Tasks; + +namespace Dapper +{ + public static partial class SqlMapper + { + public partial class GridReader + { + /// + /// Read the next grid of results, returned as a dynamic object + /// + /// Note: each row can be accessed via "dynamic", or by casting to an IDictionary<string,object> + /// Whether to buffer the results. + public IAsyncEnumerable StreamAsync(bool buffered = false) => StreamAsyncImpl(typeof(DapperRow), buffered); + + /// + /// Read the next grid of results + /// + /// The type to read. + /// Whether to buffer the results. + /// is null. + public IAsyncEnumerable StreamAsync(Type type, bool buffered = false) + { + if (type == null) throw new ArgumentNullException(nameof(type)); + return StreamAsyncImpl(type, buffered); + } + + /// + /// Read the next grid of results. + /// + /// The type to read. + /// Whether the results should be buffered in memory. + public IAsyncEnumerable StreamAsync(bool buffered = false) => StreamAsyncImpl(typeof(T), buffered); + + private async IAsyncEnumerable StreamAsyncImpl(Type type, bool buffered) + { + if (reader == null) throw new ObjectDisposedException(GetType().FullName, "The reader has been disposed; this can happen after all data has been consumed"); + if (IsConsumed) throw new InvalidOperationException("Query results must be consumed in the correct order, and each result can only be consumed once"); + var typedIdentity = identity.ForGrid(type, gridIndex); + CacheInfo cache = GetCacheInfo(typedIdentity, null, addToCache); + var deserializer = cache.Deserializer; + + int hash = GetColumnHash(reader); + if (deserializer.Func == null || deserializer.Hash != hash) + { + deserializer = new DeserializerState(hash, GetDeserializer(type, reader, 0, -1, false)); + cache.Deserializer = deserializer; + } + IsConsumed = true; + if (reader is DbDataReader) + { + var buffer = buffered ? new List() : null; + + await foreach (var value in ReadStreamAsync(gridIndex, deserializer.Func).WithCancellation(cancel).ConfigureAwait(false)) + { + if (buffer != null) buffer.Add(value); + else yield return value; + } + + if (buffer == null) yield break; + foreach (var value in buffer) yield return value; + } + else + { + var result = ReadDeferred(gridIndex, deserializer.Func, type); + if (buffered) result = result?.ToList(); // for the "not a DbDataReader" scenario + + if(result == null) yield break; + foreach (var value in result) yield return value; + } + } + + private async IAsyncEnumerable ReadStreamAsync(int index, Func deserializer) + { + try + { + var dbReader = (DbDataReader)this.reader; + while (index == gridIndex && await dbReader.ReadAsync(cancel).ConfigureAwait(false)) + { + yield return ConvertTo(deserializer(dbReader)); + } + } + finally // finally so that First etc progresses things even when multiple rows + { + if (index == gridIndex) + { + await NextResultAsync().ConfigureAwait(false); + } + } + } + } + } +} +#endif // NET5_0 || NETSTANDARD2_0 diff --git a/tests/Dapper.Tests/AsyncStreamTests.cs b/tests/Dapper.Tests/AsyncStreamTests.cs new file mode 100644 index 000000000..919e219a7 --- /dev/null +++ b/tests/Dapper.Tests/AsyncStreamTests.cs @@ -0,0 +1,607 @@ +#if NET5_0 || NETCOREAPP3_1 +using System.Linq; +using System.Data; +using System.Diagnostics; +using System; +using System.Threading.Tasks; +using System.Threading; +using Xunit; +using System.Data.Common; +using Xunit.Abstractions; + +namespace Dapper.Tests +{ + [Collection(NonParallelDefinition.Name)] + public sealed class SystemSqlClientAsyncStreamTests : AsyncStreamTests { } +#if MSSQLCLIENT + [Collection(NonParallelDefinition.Name)] + public sealed class MicrosoftSqlClientAsyncStreamTests : AsyncStreamTests { } +#endif + + [Collection(NonParallelDefinition.Name)] + public sealed class SystemSqlClientAsyncStreamCacheTests : AsyncStreamCacheTests + { + public SystemSqlClientAsyncStreamCacheTests(ITestOutputHelper log) : base(log) { } + } +#if MSSQLCLIENT + [Collection(NonParallelDefinition.Name)] + public sealed class MicrosoftSqlClientAsyncStreamCacheTests : AsyncStreamCacheTests + { + public MicrosoftSqlClientAsyncStreamCacheTests(ITestOutputHelper log) : base(log) { } + } +#endif + + public abstract class AsyncStreamTests : TestBase where TProvider : SqlServerDatabaseProvider + { + private DbConnection _marsConnection; + + private DbConnection MarsConnection => _marsConnection ??= Provider.GetOpenConnection(true); + + [Fact] + public async Task TestBasicStringUsageAsync() + { + var query = connection.StreamAsync("select 'abc' as [Value] union all select @txt", new { txt = "def" }); + var arr = await query.ToListAsync().ConfigureAwait(false); + Assert.Equal(new[] { "abc", "def" }, arr); + } + + [Fact] + public async Task TestBasicStringUsageAsyncNonBuffered() + { + var query = connection.StreamAsync(new CommandDefinition("select 'abc' as [Value] union all select @txt", new { txt = "def" }, flags: CommandFlags.None)); + var arr = await query.ToArrayAsync().ConfigureAwait(false); + Assert.Equal(new[] { "abc", "def" }, arr); + } + + [Fact] + public async Task TestLongOperationWithCancellation() + { + CancellationTokenSource cancel = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + var task = connection.StreamAsync(new CommandDefinition("waitfor delay '00:00:10';select 1", cancellationToken: cancel.Token)).ToListAsync().AsTask(); + try + { + if (!task.Wait(TimeSpan.FromSeconds(7))) + { + await task; + throw new TimeoutException(); // should have cancelled + } + } + catch (AggregateException agg) + { + Assert.True(agg.InnerException.GetType().Name == "SqlException"); + } + } + + [Fact] + public async Task TestBasicStringUsageClosedAsync() + { + using (var conn = GetClosedConnection()) + { + var query = conn.StreamAsync("select 'abc' as [Value] union all select @txt", new { txt = "def" }); + var arr = await query.ToArrayAsync().ConfigureAwait(false); + Assert.Equal(new[] { "abc", "def" }, arr); + } + } + + [Fact] + public async Task TestQueryDynamicAsync() + { + var row = await connection.StreamAsync("select 'abc' as [Value]").SingleAsync().ConfigureAwait(false); + string value = row.Value; + Assert.Equal("abc", value); + } + + [Fact] + public async Task TestClassWithStringUsageAsync() + { + var query = connection.StreamAsync("select 'abc' as [Value] union all select @txt", new { txt = "def" }); + var arr = await query.ToArrayAsync().ConfigureAwait(false); + Assert.Equal(new[] { "abc", "def" }, arr.Select(x => x.Value)); + } + + [Fact] + public async Task TestMultiMapWithSplitAsync() + { + const string sql = "select 1 as id, 'abc' as name, 2 as id, 'def' as name"; + var productQuery = connection.StreamAsync(sql, (prod, cat) => + { + prod.Category = cat; + return prod; + }); + + var product = await productQuery.FirstAsync().ConfigureAwait(false); + // assertions + Assert.Equal(1, product.Id); + Assert.Equal("abc", product.Name); + Assert.Equal(2, product.Category.Id); + Assert.Equal("def", product.Category.Name); + } + + [Fact] + public async Task TestMultiMapArbitraryWithSplitAsync() + { + const string sql = "select 1 as id, 'abc' as name, 2 as id, 'def' as name"; + var productQuery = connection.StreamAsync(sql, new[] { typeof(Product), typeof(Category) }, (objects) => + { + var prod = (Product)objects[0]; + prod.Category = (Category)objects[1]; + return prod; + }); + + var product = await productQuery.FirstAsync().ConfigureAwait(false); + // assertions + Assert.Equal(1, product.Id); + Assert.Equal("abc", product.Name); + Assert.Equal(2, product.Category.Id); + Assert.Equal("def", product.Category.Name); + } + + [Fact] + public async Task TestMultiMapWithSplitClosedConnAsync() + { + const string sql = "select 1 as id, 'abc' as name, 2 as id, 'def' as name"; + using (var conn = GetClosedConnection()) + { + var productQuery = conn.StreamAsync(sql, (prod, cat) => + { + prod.Category = cat; + return prod; + }); + + var product = await productQuery.FirstAsync().ConfigureAwait(false); + // assertions + Assert.Equal(1, product.Id); + Assert.Equal("abc", product.Name); + Assert.Equal(2, product.Category.Id); + Assert.Equal("def", product.Category.Name); + } + } + + [Fact] + public async Task TestMultiAsync() + { + using (SqlMapper.GridReader multi = await connection.QueryMultipleAsync("select 1; select 2").ConfigureAwait(false)) + { + Assert.Equal(1, multi.StreamAsync().SingleAsync().Result); + Assert.Equal(2, multi.StreamAsync().SingleAsync().Result); + } + } + + [Fact] + public async Task TestMultiConversionAsync() + { + using (SqlMapper.GridReader multi = await connection.QueryMultipleAsync("select Cast(1 as BigInt) Col1; select Cast(2 as BigInt) Col2").ConfigureAwait(false)) + { + Assert.Equal(1, multi.StreamAsync().SingleAsync().Result); + Assert.Equal(2, multi.StreamAsync().SingleAsync().Result); + } + } + + [Fact] + public async Task TestMultiAsyncViaFirstOrDefault() + { + using (SqlMapper.GridReader multi = await connection.QueryMultipleAsync("select 1; select 2; select 3; select 4; select 5").ConfigureAwait(false)) + { + Assert.Equal(1, multi.ReadFirstOrDefaultAsync().Result); + Assert.Equal(2, multi.StreamAsync().SingleAsync().Result); + Assert.Equal(3, multi.ReadFirstOrDefaultAsync().Result); + Assert.Equal(4, multi.StreamAsync().SingleAsync().Result); + Assert.Equal(5, multi.ReadFirstOrDefaultAsync().Result); + } + } + + [Fact] + public async Task TestMultiClosedConnAsync() + { + using (var conn = GetClosedConnection()) + { + using (SqlMapper.GridReader multi = await conn.QueryMultipleAsync("select 1; select 2").ConfigureAwait(false)) + { + Assert.Equal(1, multi.StreamAsync().SingleAsync().Result); + Assert.Equal(2, multi.StreamAsync().SingleAsync().Result); + } + } + } + + [Fact] + public async Task TestMultiClosedConnAsyncViaFirstOrDefault() + { + using (var conn = GetClosedConnection()) + { + using (SqlMapper.GridReader multi = await conn.QueryMultipleAsync("select 1; select 2; select 3; select 4; select 5").ConfigureAwait(false)) + { + Assert.Equal(1, multi.ReadFirstOrDefaultAsync().Result); + Assert.Equal(2, multi.StreamAsync().SingleAsync().Result); + Assert.Equal(3, multi.ReadFirstOrDefaultAsync().Result); + Assert.Equal(4, multi.StreamAsync().SingleAsync().Result); + Assert.Equal(5, multi.ReadFirstOrDefaultAsync().Result); + } + } + } + + private static async Task LiteralReplacement(IDbConnection conn) + { + try + { + await conn.ExecuteAsync("drop table literal1").ConfigureAwait(false); + } + catch { /* don't care */ } + await conn.ExecuteAsync("create table literal1 (id int not null, foo int not null)").ConfigureAwait(false); + await conn.ExecuteAsync("insert literal1 (id,foo) values ({=id}, @foo)", new { id = 123, foo = 456 }).ConfigureAwait(false); + var rows = new[] { new { id = 1, foo = 2 }, new { id = 3, foo = 4 } }; + await conn.ExecuteAsync("insert literal1 (id,foo) values ({=id}, @foo)", rows).ConfigureAwait(false); + var count = await conn.StreamAsync("select count(1) from literal1 where id={=foo}", new { foo = 123 }).SingleAsync().ConfigureAwait(false); + Assert.Equal(1, count); + int sum = await conn.StreamAsync("select sum(id) + sum(foo) from literal1").SingleAsync().ConfigureAwait(false); + Assert.Equal(sum, 123 + 456 + 1 + 2 + 3 + 4); + } + + private static async Task LiteralReplacementDynamic(IDbConnection conn) + { + var args = new DynamicParameters(); + args.Add("id", 123); + try { await conn.ExecuteAsync("drop table literal2").ConfigureAwait(false); } + catch { /* don't care */ } + await conn.ExecuteAsync("create table literal2 (id int not null)").ConfigureAwait(false); + await conn.ExecuteAsync("insert literal2 (id) values ({=id})", args).ConfigureAwait(false); + + args = new DynamicParameters(); + args.Add("foo", 123); + var count = await conn.StreamAsync("select count(1) from literal2 where id={=foo}", args).SingleAsync().ConfigureAwait(false); + Assert.Equal(1, count); + } + + [Fact] + public async Task LiteralInAsync() + { + await connection.ExecuteAsync("create table #literalin(id int not null);").ConfigureAwait(false); + await connection.ExecuteAsync("insert #literalin (id) values (@id)", new[] { + new { id = 1 }, + new { id = 2 }, + new { id = 3 }, + }).ConfigureAwait(false); + var count = await connection.StreamAsync("select count(1) from #literalin where id in {=ids}", + new { ids = new[] { 1, 3, 4 } }).SingleAsync().ConfigureAwait(false); + Assert.Equal(2, count); + } + + private class BasicType + { + public string Value { get; set; } + } + + [Fact] + public async Task TypeBasedViaTypeAsync() + { + Type type = Common.GetSomeType(); + + dynamic actual = await MarsConnection.StreamAsync(type, "select @A as [A], @B as [B]", new { A = 123, B = "abc" }).FirstOrDefaultAsync().ConfigureAwait(false); + Assert.Equal(((object)actual).GetType(), type); + int a = actual.A; + string b = actual.B; + Assert.Equal(123, a); + Assert.Equal("abc", b); + } + + [Fact] + public async Task Issue346_StreamAsyncConvert() + { + int i = await connection.StreamAsync("Select Cast(123 as bigint)").FirstAsync().ConfigureAwait(false); + Assert.Equal(123, i); + } + + [Fact] + public async Task TestSupportForDynamicParametersOutputExpressions_Query_Default() + { + var bob = new Person { Name = "bob", PersonId = 1, Address = new Address { PersonId = 2 } }; + + var p = new DynamicParameters(bob); + p.Output(bob, b => b.PersonId); + p.Output(bob, b => b.Occupation); + p.Output(bob, b => b.NumberOfLegs); + p.Output(bob, b => b.Address.Name); + p.Output(bob, b => b.Address.PersonId); + + var result = await connection.StreamAsync(@" +SET @Occupation = 'grillmaster' +SET @PersonId = @PersonId + 1 +SET @NumberOfLegs = @NumberOfLegs - 1 +SET @AddressName = 'bobs burgers' +SET @AddressPersonId = @PersonId +select 42", p).SingleAsync().ConfigureAwait(false); + + Assert.Equal("grillmaster", bob.Occupation); + Assert.Equal(2, bob.PersonId); + Assert.Equal(1, bob.NumberOfLegs); + Assert.Equal("bobs burgers", bob.Address.Name); + Assert.Equal(2, bob.Address.PersonId); + Assert.Equal(42, result); + } + + [Fact] + public async Task TestSupportForDynamicParametersOutputExpressions_Query_BufferedAsync() + { + var bob = new Person { Name = "bob", PersonId = 1, Address = new Address { PersonId = 2 } }; + + var p = new DynamicParameters(bob); + p.Output(bob, b => b.PersonId); + p.Output(bob, b => b.Occupation); + p.Output(bob, b => b.NumberOfLegs); + p.Output(bob, b => b.Address.Name); + p.Output(bob, b => b.Address.PersonId); + + var result = await connection.StreamAsync(new CommandDefinition(@" +SET @Occupation = 'grillmaster' +SET @PersonId = @PersonId + 1 +SET @NumberOfLegs = @NumberOfLegs - 1 +SET @AddressName = 'bobs burgers' +SET @AddressPersonId = @PersonId +select 42", p, flags: CommandFlags.Buffered)).SingleAsync().ConfigureAwait(false); + + Assert.Equal("grillmaster", bob.Occupation); + Assert.Equal(2, bob.PersonId); + Assert.Equal(1, bob.NumberOfLegs); + Assert.Equal("bobs burgers", bob.Address.Name); + Assert.Equal(2, bob.Address.PersonId); + Assert.Equal(42, result); + } + + [Fact] + public async Task TestSupportForDynamicParametersOutputExpressions_Query_NonBufferedAsync() + { + var bob = new Person { Name = "bob", PersonId = 1, Address = new Address { PersonId = 2 } }; + + var p = new DynamicParameters(bob); + p.Output(bob, b => b.PersonId); + p.Output(bob, b => b.Occupation); + p.Output(bob, b => b.NumberOfLegs); + p.Output(bob, b => b.Address.Name); + p.Output(bob, b => b.Address.PersonId); + + var result = await connection.StreamAsync(new CommandDefinition(@" +SET @Occupation = 'grillmaster' +SET @PersonId = @PersonId + 1 +SET @NumberOfLegs = @NumberOfLegs - 1 +SET @AddressName = 'bobs burgers' +SET @AddressPersonId = @PersonId +select 42", p, flags: CommandFlags.None)).SingleAsync().ConfigureAwait(false); + + Assert.Equal("grillmaster", bob.Occupation); + Assert.Equal(2, bob.PersonId); + Assert.Equal(1, bob.NumberOfLegs); + Assert.Equal("bobs burgers", bob.Address.Name); + Assert.Equal(2, bob.Address.PersonId); + Assert.Equal(42, result); + } + + [Fact] + public async Task TestSupportForDynamicParametersOutputExpressions_QueryMultipleAsync() + { + var bob = new Person { Name = "bob", PersonId = 1, Address = new Address { PersonId = 2 } }; + + var p = new DynamicParameters(bob); + p.Output(bob, b => b.PersonId); + p.Output(bob, b => b.Occupation); + p.Output(bob, b => b.NumberOfLegs); + p.Output(bob, b => b.Address.Name); + p.Output(bob, b => b.Address.PersonId); + + int x, y; + using (var multi = await connection.QueryMultipleAsync(@" +SET @Occupation = 'grillmaster' +SET @PersonId = @PersonId + 1 +SET @NumberOfLegs = @NumberOfLegs - 1 +SET @AddressName = 'bobs burgers' +select 42 +select 17 +SET @AddressPersonId = @PersonId", p).ConfigureAwait(false)) + { + x = multi.StreamAsync().SingleAsync().Result; + y = multi.StreamAsync().SingleAsync().Result; + } + + Assert.Equal("grillmaster", bob.Occupation); + Assert.Equal(2, bob.PersonId); + Assert.Equal(1, bob.NumberOfLegs); + Assert.Equal("bobs burgers", bob.Address.Name); + Assert.Equal(2, bob.Address.PersonId); + Assert.Equal(42, x); + Assert.Equal(17, y); + } + + [Fact] + public async Task TestSubsequentQueriesSuccessAsync() + { + var data0 = await connection.StreamAsync("select 1 as [Id] where 1 = 0").ToListAsync().ConfigureAwait(false); + Assert.Empty(data0); + + var data1 = await connection.StreamAsync(new CommandDefinition("select 1 as [Id] where 1 = 0", flags: CommandFlags.Buffered)).ToListAsync().ConfigureAwait(false); + Assert.Empty(data1); + + var data2 = await connection.StreamAsync(new CommandDefinition("select 1 as [Id] where 1 = 0", flags: CommandFlags.None)).ToListAsync().ConfigureAwait(false); + Assert.Empty(data2); + + data0 = await connection.StreamAsync("select 1 as [Id] where 1 = 0").ToListAsync().ConfigureAwait(false); + Assert.Empty(data0); + + data1 = await connection.StreamAsync(new CommandDefinition("select 1 as [Id] where 1 = 0", flags: CommandFlags.Buffered)).ToListAsync().ConfigureAwait(false); + Assert.Empty(data1); + + data2 = await connection.StreamAsync(new CommandDefinition("select 1 as [Id] where 1 = 0", flags: CommandFlags.None)).ToListAsync().ConfigureAwait(false); + Assert.Empty(data2); + } + + private class AsyncFoo0 { public int Id { get; set; } } + + private class AsyncFoo1 { public int Id { get; set; } } + + private class AsyncFoo2 { public int Id { get; set; } } + + [Fact] + public async Task TestMultiMapArbitraryMapsAsync() + { + // please excuse the trite example, but it is easier to follow than a more real-world one + const string createSql = @" + create table #ReviewBoards (Id int, Name varchar(20), User1Id int, User2Id int, User3Id int, User4Id int, User5Id int, User6Id int, User7Id int, User8Id int, User9Id int) + create table #Users (Id int, Name varchar(20)) + + insert #Users values(1, 'User 1') + insert #Users values(2, 'User 2') + insert #Users values(3, 'User 3') + insert #Users values(4, 'User 4') + insert #Users values(5, 'User 5') + insert #Users values(6, 'User 6') + insert #Users values(7, 'User 7') + insert #Users values(8, 'User 8') + insert #Users values(9, 'User 9') + + insert #ReviewBoards values(1, 'Review Board 1', 1, 2, 3, 4, 5, 6, 7, 8, 9) +"; + await connection.ExecuteAsync(createSql).ConfigureAwait(false); + try + { + const string sql = @" + select + rb.Id, rb.Name, + u1.*, u2.*, u3.*, u4.*, u5.*, u6.*, u7.*, u8.*, u9.* + from #ReviewBoards rb + inner join #Users u1 on u1.Id = rb.User1Id + inner join #Users u2 on u2.Id = rb.User2Id + inner join #Users u3 on u3.Id = rb.User3Id + inner join #Users u4 on u4.Id = rb.User4Id + inner join #Users u5 on u5.Id = rb.User5Id + inner join #Users u6 on u6.Id = rb.User6Id + inner join #Users u7 on u7.Id = rb.User7Id + inner join #Users u8 on u8.Id = rb.User8Id + inner join #Users u9 on u9.Id = rb.User9Id +"; + + var types = new[] { typeof(ReviewBoard), typeof(User), typeof(User), typeof(User), typeof(User), typeof(User), typeof(User), typeof(User), typeof(User), typeof(User) }; + + Func mapper = (objects) => + { + var board = (ReviewBoard)objects[0]; + board.User1 = (User)objects[1]; + board.User2 = (User)objects[2]; + board.User3 = (User)objects[3]; + board.User4 = (User)objects[4]; + board.User5 = (User)objects[5]; + board.User6 = (User)objects[6]; + board.User7 = (User)objects[7]; + board.User8 = (User)objects[8]; + board.User9 = (User)objects[9]; + return board; + }; + + var data = await connection.StreamAsync(sql, types, mapper).ToListAsync().ConfigureAwait(false); + + var p = data[0]; + Assert.Equal(1, p.Id); + Assert.Equal("Review Board 1", p.Name); + Assert.Equal(1, p.User1.Id); + Assert.Equal(2, p.User2.Id); + Assert.Equal(3, p.User3.Id); + Assert.Equal(4, p.User4.Id); + Assert.Equal(5, p.User5.Id); + Assert.Equal(6, p.User6.Id); + Assert.Equal(7, p.User7.Id); + Assert.Equal(8, p.User8.Id); + Assert.Equal(9, p.User9.Id); + Assert.Equal("User 1", p.User1.Name); + Assert.Equal("User 2", p.User2.Name); + Assert.Equal("User 3", p.User3.Name); + Assert.Equal("User 4", p.User4.Name); + Assert.Equal("User 5", p.User5.Name); + Assert.Equal("User 6", p.User6.Name); + Assert.Equal("User 7", p.User7.Name); + Assert.Equal("User 8", p.User8.Name); + Assert.Equal("User 9", p.User9.Name); + } + finally + { + connection.Execute("drop table #Users drop table #ReviewBoards"); + } + } + + [Fact] + public async Task Issue157_ClosedReaderAsync() + { + var args = new { x = 42 }; + const string sql = "select 123 as [A], 'abc' as [B] where @x=42"; + var row = await connection.StreamAsync(new CommandDefinition( + sql, args, flags: CommandFlags.None)).SingleAsync().ConfigureAwait(false); + Assert.NotNull(row); + Assert.Equal(123, row.A); + Assert.Equal("abc", row.B); + + args = new { x = 5 }; + Assert.False(await connection.StreamAsync(new CommandDefinition(sql, args, flags: CommandFlags.None)).AnyAsync().ConfigureAwait(false)); + } + + [Fact] + public async Task TestAtEscaping() + { + var id = await connection.StreamAsync(@" + declare @@Name int + select @@Name = @Id+1 + select @@Name + ", new Product { Id = 1 }).SingleAsync().ConfigureAwait(false); + Assert.Equal(2, id); + } + + [Fact] + public async Task Issue563_StreamAsyncShouldThrowException() + { + try + { + var data = await connection.StreamAsync("select 1 union all select 2; RAISERROR('after select', 16, 1);").ToListAsync().ConfigureAwait(false); + Assert.True(false, "Expected Exception"); + } + catch (Exception ex) when (ex.GetType().Name == "SqlException" && ex.Message == "after select") { /* swallow only this */ } + } + } + + [Collection(NonParallelDefinition.Name)] + public abstract class AsyncStreamCacheTests : TestBase where TProvider : SqlServerDatabaseProvider + { + private readonly ITestOutputHelper _log; + public AsyncStreamCacheTests(ITestOutputHelper log) => _log = log; + private DbConnection _marsConnection; + private DbConnection MarsConnection => _marsConnection ??= Provider.GetOpenConnection(true); + + public override void Dispose() + { + _marsConnection?.Dispose(); + _marsConnection = null; + base.Dispose(); + } + + [Fact] + public async Task AssertNoCacheWorksForQueryMultiple() + { + const int a = 123, b = 456; + var cmdDef = new CommandDefinition("select @a; select @b;", new + { + a, + b + }, commandType: CommandType.Text, flags: CommandFlags.NoCache); + + int c, d; + SqlMapper.PurgeQueryCache(); + int before = SqlMapper.GetCachedSQLCount(); + using (var multi = await MarsConnection.QueryMultipleAsync(cmdDef)) + { + c = await multi.StreamAsync().SingleAsync(); + d = await multi.StreamAsync().SingleAsync(); + } + int after = SqlMapper.GetCachedSQLCount(); + _log?.WriteLine($"before: {before}; after: {after}"); + // too brittle in concurrent tests to assert + // Assert.Equal(0, before); + // Assert.Equal(0, after); + Assert.Equal(123, c); + Assert.Equal(456, d); + } + } +} +#endif // NET5_0 || NETCOREAPP3_1 diff --git a/tests/Dapper.Tests/Dapper.Tests.csproj b/tests/Dapper.Tests/Dapper.Tests.csproj index b9f8b9347..a765a55a2 100644 --- a/tests/Dapper.Tests/Dapper.Tests.csproj +++ b/tests/Dapper.Tests/Dapper.Tests.csproj @@ -29,6 +29,9 @@ + + + diff --git a/tests/Dapper.Tests/MiscTests.cs b/tests/Dapper.Tests/MiscTests.cs index 4d5dd26c4..aef94823d 100644 --- a/tests/Dapper.Tests/MiscTests.cs +++ b/tests/Dapper.Tests/MiscTests.cs @@ -223,6 +223,11 @@ public async Task TestConversionExceptionMessages() list = await connection.QueryAsync(sql); Assert.Null(Assert.Single(list)); +#if NETCOREAPP3_1 || NET5_0 + list = await connection.StreamAsync(sql).ToListAsync(); + Assert.Null(Assert.Single(list)); +#endif + // Single row paths Assert.Null(connection.QueryFirst(sql)); Assert.Null(connection.QueryFirstOrDefault(sql)); @@ -257,6 +262,10 @@ static async Task TestExceptionsAsync(DbConnection connection, string sql, st Assert.Equal(exception, ex.Message); ex = await Assert.ThrowsAsync(() => connection.QuerySingleOrDefaultAsync(sql)); Assert.Equal(exception, ex.Message); +#if NETCOREAPP3_1 || NET5_0 + ex = await Assert.ThrowsAsync(async () => await connection.StreamAsync(sql).ToListAsync()); + Assert.Equal(exception, ex.Message); +#endif } // Null value throws