Skip to content

Commit

Permalink
Add missing tlsCAFile option
Browse files Browse the repository at this point in the history
  • Loading branch information
w1am committed Feb 6, 2024
1 parent def981a commit b5ca231
Show file tree
Hide file tree
Showing 10 changed files with 253 additions and 94 deletions.
70 changes: 59 additions & 11 deletions src/EventStore.Client.Streams/EventStoreClient.Read.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using EventStore.Client.Streams;
using Grpc.Core;
using static EventStore.Client.Streams.ReadResp;
Expand Down Expand Up @@ -42,23 +37,76 @@ public ReadAllStreamResult ReadAllAsync(
Options = new() {
ReadDirection = direction switch {
Direction.Backwards => ReadReq.Types.Options.Types.ReadDirection.Backwards,
Direction.Forwards => ReadReq.Types.Options.Types.ReadDirection.Forwards,
_ => throw InvalidOption(direction)
Direction.Forwards => ReadReq.Types.Options.Types.ReadDirection.Forwards,
_ => throw InvalidOption(direction)
},
ResolveLinks = resolveLinkTos,
All = new() {
Position = new() {
CommitPosition = position.CommitPosition,
CommitPosition = position.CommitPosition,
PreparePosition = position.PreparePosition
}
},
Count = (ulong)maxCount,
UuidOption = new() {Structured = new()},
NoFilter = new(),
Count = (ulong)maxCount,
UuidOption = new() {Structured = new()},
NoFilter = new(),
ControlOption = new() {Compatibility = 1}
}
}, Settings, deadline, userCredentials, cancellationToken);
}

/// <summary>
/// Asynchronously reads all events with filtering.
/// </summary>
/// <param name="direction">The <see cref="Direction"/> in which to read.</param>
/// <param name="position">The <see cref="Position"/> to start reading from.</param>
/// <param name="eventFilter">The <see cref="IEventFilter"/> to apply.</param>
/// <param name="maxCount">The maximum count to read.</param>
/// <param name="resolveLinkTos">Whether to resolve LinkTo events automatically.</param>
/// <param name="deadline"></param>
/// <param name="userCredentials">The optional <see cref="UserCredentials"/> to perform operation with.</param>
/// <param name="cancellationToken">The optional <see cref="System.Threading.CancellationToken"/>.</param>
/// <returns></returns>
public ReadAllStreamResult ReadAllAsync(
Direction direction,
Position position,
IEventFilter eventFilter,
long maxCount = long.MaxValue,
bool resolveLinkTos = false,
TimeSpan? deadline = null,
UserCredentials? userCredentials = null,
CancellationToken cancellationToken = default
) {
if (maxCount <= 0) {
throw new ArgumentOutOfRangeException(nameof(maxCount));
}

var readReq = new ReadReq {
Options = new() {
ReadDirection = direction switch {
Direction.Backwards => ReadReq.Types.Options.Types.ReadDirection.Backwards,
Direction.Forwards => ReadReq.Types.Options.Types.ReadDirection.Forwards,
_ => throw InvalidOption(direction)
},
ResolveLinks = resolveLinkTos,
All = new() {
Position = new() {
CommitPosition = position.CommitPosition,
PreparePosition = position.PreparePosition
}
},
Count = (ulong)maxCount,
UuidOption = new() { Structured = new() },
ControlOption = new() { Compatibility = 1 },
Filter = GetFilterOptions(eventFilter)
}
};

return new ReadAllStreamResult(async _ => {
var channelInfo = await GetChannelInfo(cancellationToken).ConfigureAwait(false);
return channelInfo.CallInvoker;
}, readReq, Settings, deadline, userCredentials, cancellationToken);
}

/// <summary>
/// A class that represents the result of a read operation on the $all stream. You may either enumerate this instance directly or <see cref="Messages"/>. Do not enumerate more than once.
Expand Down
12 changes: 6 additions & 6 deletions src/EventStore.Client.Streams/EventStoreClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,11 @@ private StreamAppender CreateStreamAppender() {
}
}

private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(
SubscriptionFilterOptions? filterOptions) {
if (filterOptions == null) {
private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(IEventFilter? filter, uint checkpointInterval = 0) {
if (filter == null) {
return null;
}

var filter = filterOptions.Filter;

var options = filter switch {
StreamFilter => new ReadReq.Types.Options.Types.FilterOptions {
StreamIdentifier = (filter.Prefixes, filter.Regex) switch {
Expand Down Expand Up @@ -127,11 +124,14 @@ private StreamAppender CreateStreamAppender() {
options.Count = new Empty();
}

options.CheckpointIntervalMultiplier = filterOptions.CheckpointInterval;
options.CheckpointIntervalMultiplier = checkpointInterval;

return options;
}

private static ReadReq.Types.Options.Types.FilterOptions? GetFilterOptions(SubscriptionFilterOptions? filterOptions)
=> filterOptions == null ? null : GetFilterOptions(filterOptions.Filter, filterOptions.CheckpointInterval);

/// <inheritdoc />
public override void Dispose() {
if (_streamAppenderLazy.IsValueCreated)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,13 @@ public bool Insecure {
/// True if certificates will be validated; otherwise false.
/// </summary>
public bool TlsVerifyCert { get; set; } = true;


/// <summary>
/// Path to a certificate file for secure connection. Not required for enabling secure connection. Useful for self-signed certificate
/// that are not installed on the system trust store.
/// </summary>
public string? TlsCaFile { get; set; }

/// <summary>
/// The default <see cref="EventStoreClientConnectivitySettings"/>.
/// </summary>
Expand Down
Loading

0 comments on commit b5ca231

Please sign in to comment.