Skip to content

Commit

Permalink
Reorder Source/FlowWithContext type parameters (#5648)
Browse files Browse the repository at this point in the history
Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
ismaelhamed and Aaronontheweb authored Mar 6, 2022
1 parent 2bbc58c commit 60d95ac
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 128 deletions.
67 changes: 35 additions & 32 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt

Large diffs are not rendered by default.

80 changes: 80 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowWithContextSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//-----------------------------------------------------------------------
// <copyright file="FlowWithContextSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Tests.Dsl
{
public class FlowWithContextSpec : AkkaSpec
{
private ActorMaterializer Materializer { get; }

public FlowWithContextSpec(ITestOutputHelper helper) : base(helper)
{
var settings = ActorMaterializerSettings.Create(Sys);
Materializer = ActorMaterializer.Create(Sys, settings);
}

[Fact]
public void A_FlowWithContext_must_get_created_from_FlowAsFlowWithContext()
{
var flow = Flow.Create<Message>().Select(m => m.Copy(data: m.Data + "z"));
var flowWithContext = flow.AsFlowWithContext<Message, long, Message, long, NotUsed, Message>((m, o) => new Message(m.Data, o), m => m.Offset);

Source.From(new[] { new Message("a", 1L) })
.AsSourceWithContext(m => m.Offset)
.Via(flowWithContext)
.AsSource()
.RunWith(this.SinkProbe<(Message, long)>(), Materializer)
.Request(1)
.ExpectNext((new Message("az", 1L), 1L))
.ExpectComplete();
}
}

sealed class Message : IEquatable<Message>
{
public string Data { get; }
public long Offset { get; }

public Message(string data, long offset)
{
Data = data;
Offset = offset;
}

public Message Copy(string data = null, long? offset = null) => new Message(data ?? Data, offset ?? Offset);

public bool Equals(Message other)
{
if (other is null) return false;
if (ReferenceEquals(this, other)) return true;
return string.Equals(Data, other.Data) && Offset == other.Offset;
}

public override bool Equals(object obj)
{
if (obj is null) return false;
if (ReferenceEquals(this, obj)) return true;
return obj is Message other && Equals(other);
}

public override int GetHashCode()
{
unchecked
{
return ((Data != null ? Data.GetHashCode() : 0) * 397) ^ Offset.GetHashCode();
}
}
}
}
7 changes: 2 additions & 5 deletions src/core/Akka.Streams.Tests/Dsl/SourceWithContextSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,17 +126,14 @@ public void SourceWithContext_must_pass_through_context_using_Select_and_Where()
[Fact]
public void SourceWithContext_must_pass_through_context_using_FlowWithContext()
{
var flowWithContext = FlowWithContext.Create<long, string>();

var msg = new Message("a", 1);
var flowWithContext = FlowWithContext.Create<string, long>();

var sink = this.CreateSubscriberProbe<(string, long)>();

Source.From(new[] { msg })
Source.From(new[] { new Message("a", 1L) })
.AsSourceWithContext(x => x.Offset)
.Select(x => x.Data)
.Via(flowWithContext.Select(s => s + "b"))
.AsSource()
.RunWith(Sink.FromSubscriber(sink), Materializer);

var sub = sink.ExpectSubscription();
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/Dsl/WithContextUsageSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ private static CommittableMessage<Record>[] GenInput(int start, int end) =>
new CommittableMessage<Record>(new Record(GenKey(i), GenValue(i)), new CommittableOffsetImpl(i)))
.ToArray();

private static SourceWithContext<Offset, Record, NotUsed> CreateSourceWithContext(
private static SourceWithContext<Record, Offset, NotUsed> CreateSourceWithContext(
params CommittableMessage<Record>[] messages) =>
CommittableConsumer.CommittableSource(messages)
.AsSourceWithContext(m => new Offset(m.Offset.Offset))
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams/Dsl/FlowOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2399,7 +2399,7 @@ public static Flow<T, T, TMat> Watch<T, TMat>(this Flow<T, T, TMat> flow, IActor
/// <typeparam name="TCtxOut">Resulting context type</typeparam>
/// <typeparam name="TMat">Materialized value type</typeparam>
/// <typeparam name="TIn2">Type of passed flow elements</typeparam>
public static FlowWithContext<TCtxIn, TIn, TCtxOut, TOut, TMat> AsFlowWithContext<TCtxIn, TIn, TCtxOut, TOut, TMat, TIn2>(
public static FlowWithContext<TIn, TCtxIn, TOut, TCtxOut, TMat> AsFlowWithContext<TIn, TCtxIn, TOut, TCtxOut, TMat, TIn2>(
this Flow<TIn2, TOut, TMat> flow,
Func<TIn, TCtxIn, TIn2> collapseContext,
Func<TOut, TCtxOut> extractContext)
Expand Down
34 changes: 17 additions & 17 deletions src/core/Akka.Streams/Dsl/FlowWithContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Runtime.CompilerServices;
using Akka.Annotations;

namespace Akka.Streams.Dsl
{
Expand All @@ -17,28 +18,27 @@ namespace Akka.Streams.Dsl
/// operations.
///
/// An "empty" flow can be created by calling <see cref="FlowWithContext.Create{TCtx,TIn}"/>.
///
/// API MAY CHANGE
///</summary>
public sealed class FlowWithContext<TCtxIn, TIn, TCtxOut, TOut, TMat>
///</summary>
[ApiMayChange]
public sealed class FlowWithContext<TIn, TCtxIn, TOut, TCtxOut, TMat>
: GraphDelegate<FlowShape<(TIn, TCtxIn), (TOut, TCtxOut)>, TMat>
{
internal FlowWithContext(Flow<(TIn, TCtxIn), (TOut, TCtxOut), TMat> flow)
internal FlowWithContext(Flow<(TIn, TCtxIn), (TOut, TCtxOut), TMat> flow)
: base(flow)
{
}

///<summary>
/// Transform this flow by the regular flow. The given flow must support manual context propagation by
/// taking and producing tuples of (data, context).
///
/// This can be used as an escape hatch for operations that are not (yet) provided with automatic
/// context propagation here.
///</summary>
public FlowWithContext<TCtxIn, TIn, TCtx2, TOut2, TMat> Via<TCtx2, TOut2, TMat2>(
public FlowWithContext<TIn, TCtxIn, TOut2, TCtx2, TMat> Via<TOut2, TCtx2, TMat2>(
IGraph<FlowShape<(TOut, TCtxOut), (TOut2, TCtx2)>, TMat2> viaFlow) =>
FlowWithContext.From(Flow.FromGraph(Inner).Via(viaFlow));

///<summary>
/// Transform this flow by the regular flow. The given flow must support manual context propagation by
/// taking and producing tuples of (data, context).
Expand All @@ -49,7 +49,7 @@ public FlowWithContext<TCtxIn, TIn, TCtx2, TOut2, TMat> Via<TCtx2, TOut2, TMat2>
/// The <paramref name="combine"/> function is used to compose the materialized values of this flow and that
/// flow into the materialized value of the resulting Flow.
///</summary>
public FlowWithContext<TCtxIn, TIn, TCtx2, TOut2, TMat3> ViaMaterialized<TCtx2, TOut2, TMat2, TMat3>(
public FlowWithContext<TIn, TCtxIn, TOut2, TCtx2, TMat3> ViaMaterialized<TOut2, TCtx2, TMat2, TMat3>(
IGraph<FlowShape<(TOut, TCtxOut), (TOut2, TCtx2)>, TMat2> viaFlow, Func<TMat, TMat2, TMat3> combine) =>
FlowWithContext.From(Flow.FromGraph(Inner).ViaMaterialized(viaFlow, combine));

Expand All @@ -60,17 +60,17 @@ public FlowWithContext<TCtxIn, TIn, TCtx2, TOut2, TMat3> ViaMaterialized<TCtx2,
public static class FlowWithContext
{
/// <summary>
/// Creates an "empty" <see cref="FlowWithContext{TCtxIn,TIn,TCtxOut,TOut,TMat}"/> that passes elements through with their context unchanged.
/// Creates an "empty" <see cref="FlowWithContext{TIn,TCtxIn,TOut,TCtxOut,TMat}"/> that passes elements through with their context unchanged.
/// </summary>
/// <typeparam name="TCtx"></typeparam>
/// <typeparam name="TIn"></typeparam>
/// <typeparam name="TCtx"></typeparam>
/// <returns></returns>
public static FlowWithContext<TCtx, TIn, TCtx, TIn, NotUsed> Create<TCtx, TIn>()
public static FlowWithContext<TIn, TCtx, TIn, TCtx, NotUsed> Create<TIn, TCtx>()
{
var under = Flow.Create<(TIn, TCtx), NotUsed>();
return new FlowWithContext<TCtx, TIn, TCtx, TIn, NotUsed>(under);
return new FlowWithContext<TIn, TCtx, TIn, TCtx, NotUsed>(under);
}

/// <summary>
/// Creates a FlowWithContext from a regular flow that operates on a pair of `(data, context)` elements.
/// </summary>
Expand All @@ -81,8 +81,8 @@ public static FlowWithContext<TCtx, TIn, TCtx, TIn, NotUsed> Create<TCtx, TIn>()
/// <typeparam name="TOut"></typeparam>
/// <typeparam name="TMat"></typeparam>
/// <returns></returns>
public static FlowWithContext<TCtxIn, TIn, TCtxOut, TOut, TMat> From<TCtxIn, TIn, TCtxOut, TOut, TMat>(
Flow<(TIn, TCtxIn), (TOut, TCtxOut), TMat> flow) =>
new FlowWithContext<TCtxIn, TIn, TCtxOut, TOut, TMat>(flow);
public static FlowWithContext<TIn, TCtxIn, TOut, TCtxOut, TMat> From<TIn, TCtxIn, TOut, TCtxOut, TMat>(
Flow<(TIn, TCtxIn), (TOut, TCtxOut), TMat> flow) =>
new FlowWithContext<TIn, TCtxIn, TOut, TCtxOut, TMat>(flow);
}
}
Loading

0 comments on commit 60d95ac

Please sign in to comment.