Skip to content

Commit

Permalink
Added missing SourceWithContext.FromTuples operator (#5987)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed authored Jun 10, 2022
1 parent 6cd1b3a commit daa2491
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -2116,6 +2116,10 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<T3, TMat> ZipWith<T1, T2, T3, TMat>(this Akka.Streams.Dsl.Source<T1, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SourceShape<T2>, TMat> other, System.Func<T1, T2, T3> combine) { }
public static Akka.Streams.Dsl.Source<System.ValueTuple<TOut1, long>, TMat> ZipWithIndex<TOut1, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow) { }
}
public class static SourceWithContext
{
public static Akka.Streams.Dsl.SourceWithContext<TOut, TCtxOut, TMat> FromTuples<TOut, TCtxOut, TMat>(Akka.Streams.Dsl.Source<System.ValueTuple<TOut, TCtxOut>, TMat> source) { }
}
public class static SourceWithContextOperations
{
public static Akka.Streams.Dsl.SourceWithContext<TOut2, TCtx, TMat> Collect<TOut, TCtx, TOut2, TMat>(this Akka.Streams.Dsl.SourceWithContext<TOut, TCtx, TMat> flow, System.Func<TOut, TOut2> fn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2116,6 +2116,10 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<T3, TMat> ZipWith<T1, T2, T3, TMat>(this Akka.Streams.Dsl.Source<T1, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SourceShape<T2>, TMat> other, System.Func<T1, T2, T3> combine) { }
public static Akka.Streams.Dsl.Source<System.ValueTuple<TOut1, long>, TMat> ZipWithIndex<TOut1, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow) { }
}
public class static SourceWithContext
{
public static Akka.Streams.Dsl.SourceWithContext<TOut, TCtxOut, TMat> FromTuples<TOut, TCtxOut, TMat>(Akka.Streams.Dsl.Source<System.ValueTuple<TOut, TCtxOut>, TMat> source) { }
}
public class static SourceWithContextOperations
{
public static Akka.Streams.Dsl.SourceWithContext<TOut2, TCtx, TMat> Collect<TOut, TCtx, TOut2, TMat>(this Akka.Streams.Dsl.SourceWithContext<TOut, TCtx, TMat> flow, System.Func<TOut, TOut2> fn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2116,6 +2116,10 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<T3, TMat> ZipWith<T1, T2, T3, TMat>(this Akka.Streams.Dsl.Source<T1, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SourceShape<T2>, TMat> other, System.Func<T1, T2, T3> combine) { }
public static Akka.Streams.Dsl.Source<System.ValueTuple<TOut1, long>, TMat> ZipWithIndex<TOut1, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow) { }
}
public class static SourceWithContext
{
public static Akka.Streams.Dsl.SourceWithContext<TOut, TCtxOut, TMat> FromTuples<TOut, TCtxOut, TMat>(Akka.Streams.Dsl.Source<System.ValueTuple<TOut, TCtxOut>, TMat> source) { }
}
public class static SourceWithContextOperations
{
public static Akka.Streams.Dsl.SourceWithContext<TOut2, TCtx, TMat> Collect<TOut, TCtx, TOut2, TMat>(this Akka.Streams.Dsl.SourceWithContext<TOut, TCtx, TMat> flow, System.Func<TOut, TOut2> fn)
Expand Down
13 changes: 13 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/SourceWithContextSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ public void SourceWithContext_must_get_created_from_AsSourceWithContext()
.ExpectComplete();
}

[Fact]
public void SourceWithContext_must_get_created_from_a_source_of_tuple2()
{
var msg = new Message("a", 1L);

SourceWithContext.FromTuples(Source.From(new[] { (msg, msg.Offset) }))
.AsSource()
.RunWith(this.SinkProbe<(Message, long)>(), Materializer)
.Request(1)
.ExpectNext((msg, 1L))
.ExpectComplete();
}

[Fact]
public void SourceWithContext_must_be_able_to_get_turned_back_into_a_normal_source()
{
Expand Down
11 changes: 10 additions & 1 deletion src/core/Akka.Streams/Dsl/SourceWithContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,18 @@

namespace Akka.Streams.Dsl
{
public static class SourceWithContext
{
/// <summary>
/// Creates a <see cref="SourceWithContext"/> from a regular source that operates on a tuple of `(data, context)` elements.
/// </summary>
public static SourceWithContext<TOut, TCtxOut, TMat> FromTuples<TOut, TCtxOut, TMat>(Source<(TOut, TCtxOut), TMat> source) =>
new SourceWithContext<TOut, TCtxOut, TMat>(source);
}

/// <summary>
/// A source that provides operations which automatically propagate the context of an element.
/// Only a subset of common operations from [[FlowOps]] is supported. As an escape hatch you can
/// Only a subset of common operations from <see cref="FlowOperations"/> is supported. As an escape hatch you can
/// use [[FlowWithContextOps.via]] to manually provide the context propagation for otherwise unsupported
/// operations.
/// </summary>
Expand Down

0 comments on commit daa2491

Please sign in to comment.