diff --git a/src/EventStore.Client.Streams/EventStoreClient.Append.cs b/src/EventStore.Client.Streams/EventStoreClient.Append.cs index 01ed58a0e..2cf59db32 100644 --- a/src/EventStore.Client.Streams/EventStoreClient.Append.cs +++ b/src/EventStore.Client.Streams/EventStoreClient.Append.cs @@ -92,51 +92,38 @@ public async Task AppendToStreamAsync( } private async ValueTask AppendToStreamInternal( - CallInvoker callInvoker, - AppendReq header, - IEnumerable eventData, - EventStoreClientOperationOptions operationOptions, - TimeSpan? deadline, - UserCredentials? userCredentials, - CancellationToken cancellationToken - ) { + CallInvoker callInvoker, + AppendReq header, + IEnumerable eventData, + EventStoreClientOperationOptions operationOptions, + TimeSpan? deadline, + UserCredentials? userCredentials, + CancellationToken cancellationToken + ) { using var call = new Streams.Streams.StreamsClient(callInvoker).Append( - EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken) - ); + EventStoreCallOptions.CreateNonStreaming(Settings, deadline, userCredentials, cancellationToken) + ); - try { - await call.RequestStream.WriteAsync(header).ConfigureAwait(false); - foreach (var e in eventData) { - await call.RequestStream.WriteAsync( - new AppendReq { - ProposedMessage = new AppendReq.Types.ProposedMessage { - Id = e.EventId.ToDto(), - Data = ByteString.CopyFrom(e.Data.Span), - CustomMetadata = ByteString.CopyFrom(e.Metadata.Span), - Metadata = { - { Constants.Metadata.Type, e.Type }, - { Constants.Metadata.ContentType, e.ContentType } - } - }, - } - ).ConfigureAwait(false); - } + await call.RequestStream.WriteAsync(header).ConfigureAwait(false); - await call.RequestStream.CompleteAsync().ConfigureAwait(false); - } catch (InvalidOperationException exc) { - _log.LogDebug( - exc, - "Got InvalidOperationException when appending events to stream - {streamName}. This is perfectly normal if the connection was closed from the server-side.", - header.Options.StreamIdentifier - ); - } catch (RpcException exc) { - _log.LogDebug( - exc, - "Got RpcException when appending events to stream - {streamName}. This is perfectly normal if the connection was closed from the server-side.", - header.Options.StreamIdentifier - ); + foreach (var e in eventData) { + await call.RequestStream.WriteAsync( + new AppendReq { + ProposedMessage = new AppendReq.Types.ProposedMessage { + Id = e.EventId.ToDto(), + Data = ByteString.CopyFrom(e.Data.Span), + CustomMetadata = ByteString.CopyFrom(e.Metadata.Span), + Metadata = { + { Constants.Metadata.Type, e.Type }, + { Constants.Metadata.ContentType, e.ContentType } + } + }, + } + ).ConfigureAwait(false); } + await call.RequestStream.CompleteAsync().ConfigureAwait(false); + var response = await call.ResponseAsync.ConfigureAwait(false); if (response.Success != null) diff --git a/src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs b/src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs index d24dd2a8e..0f1368805 100644 --- a/src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs +++ b/src/EventStore.Client/Interceptors/TypedExceptionInterceptor.cs @@ -1,4 +1,3 @@ -using System.Diagnostics.CodeAnalysis; using Grpc.Core; using Grpc.Core.Interceptors; using static EventStore.Client.Constants; @@ -55,7 +54,7 @@ AsyncClientStreamingCallContinuation continuation var response = continuation(context); return new AsyncClientStreamingCall( - response.RequestStream, + response.RequestStream.Apply(ConvertRpcException), response.ResponseAsync.Apply(ConvertRpcException), response.ResponseHeadersAsync, response.GetStatus, @@ -103,7 +102,15 @@ public static IAsyncStreamReader Apply(this IAsyncStreamRead public static Task Apply(this Task task, Func convertException) => task.ContinueWith(t => t.Exception?.InnerException is RpcException ex ? throw convertException(ex) : t.Result); - + + public static IClientStreamWriter Apply( + this IClientStreamWriter writer, Func convertException + ) => + new ExceptionConverterStreamWriter(writer, convertException); + + public static Task Apply(this Task task, Func convertException) => + task.ContinueWith(t => t.Exception?.InnerException is RpcException ex ? throw convertException(ex) : t); + public static AccessDeniedException ToAccessDeniedException(this RpcException exception) => new(exception.Message, exception); @@ -142,3 +149,17 @@ public async Task MoveNext(CancellationToken cancellationToken) { } } } + +class ExceptionConverterStreamWriter( + IClientStreamWriter writer, + Func convertException +) + : IClientStreamWriter { + public WriteOptions? WriteOptions { + get => writer.WriteOptions; + set => writer.WriteOptions = value; + } + + public Task WriteAsync(TRequest message) => writer.WriteAsync(message).Apply(convertException); + public Task CompleteAsync() => writer.CompleteAsync().Apply(convertException); +} diff --git a/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs b/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs index b961ce565..eba8aef1d 100644 --- a/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs +++ b/test/EventStore.Client.Streams.Tests/Append/append_to_stream.cs @@ -437,31 +437,26 @@ await Fixture.Streams.AppendToStreamAsync( ); // Force regular append by passing credentials - await Assert.ThrowsAsync( - async () =>{ - await Fixture.Streams.AppendToStreamAsync( - streamName, - StreamState.StreamExists, - GetEvents(), - userCredentials: new UserCredentials("admin", "changeit") - ); - } - ); + await Fixture.Streams.AppendToStreamAsync( + streamName, + StreamState.Any, + GetEvents(), + userCredentials: new UserCredentials("admin", "changeit") + ).ShouldThrowAsync(); // No more events should be appended to the stream var eventsCount = await Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start) .CountAsync(); + eventsCount.ShouldBe(initialNumberOfEvents, "No more events should be appended to the stream"); return; // Throw an exception after 5 events IEnumerable GetEvents() { - for (var i = 0; i < 5; i++) { + for (var i = 0; i < 100; i++) { yield return Fixture.CreateTestEvents(1).First(); } - - throw new EnumerationFailedException(); } } @@ -480,33 +475,6 @@ public async Task with_timeout_stream_revision_fails_when_operation_expired() { ex.StatusCode.ShouldBe(StatusCode.DeadlineExceeded); } - - [Fact] - public async Task when_events_enumerator_throws_the_write_does_not_succeed() { - var streamName = Fixture.GetStreamName(); - - await Fixture.Streams - .AppendToStreamAsync(streamName, StreamRevision.None, GetEvents()) - .ShouldThrowAsync(); - - var result = Fixture.Streams.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start); - - var state = await result.ReadState; - - state.ShouldBe(ReadState.StreamNotFound); - - return; - - IEnumerable GetEvents() { - var i = 0; - foreach (var evt in Fixture.CreateTestEvents(5)) { - if (i++ % 3 == 0) - throw new EnumerationFailedException(); - - yield return evt; - } - } - } class EnumerationFailedException : Exception { }