Skip to content

Commit

Permalink
where done
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 10, 2023
1 parent 379eeb3 commit f9360d8
Show file tree
Hide file tree
Showing 21 changed files with 1,196 additions and 792 deletions.
9 changes: 9 additions & 0 deletions R3.sln
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "sandbox", "sandbox", "{FAB2
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ConsoleApp1", "sandbox\ConsoleApp1\ConsoleApp1.csproj", "{72DE3CB9-195E-4740-9416-5960D75ED795}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{0544806B-3BB4-43CF-8277-BC612F32208D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "R3.Tests", "tests\R3.Tests\R3.Tests.csproj", "{42F7C4F7-3BB3-4DC8-8285-9DFF7E93BC4B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -25,13 +29,18 @@ Global
{72DE3CB9-195E-4740-9416-5960D75ED795}.Debug|Any CPU.Build.0 = Debug|Any CPU
{72DE3CB9-195E-4740-9416-5960D75ED795}.Release|Any CPU.ActiveCfg = Release|Any CPU
{72DE3CB9-195E-4740-9416-5960D75ED795}.Release|Any CPU.Build.0 = Release|Any CPU
{42F7C4F7-3BB3-4DC8-8285-9DFF7E93BC4B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{42F7C4F7-3BB3-4DC8-8285-9DFF7E93BC4B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{42F7C4F7-3BB3-4DC8-8285-9DFF7E93BC4B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{42F7C4F7-3BB3-4DC8-8285-9DFF7E93BC4B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{17F32115-0300-4842-8328-395AB56F6BD7} = {9FA6D327-728B-4436-AE3A-9E46D8FEF591}
{72DE3CB9-195E-4740-9416-5960D75ED795} = {FAB2137C-1DBA-4F2F-8E22-DF3521C9B365}
{42F7C4F7-3BB3-4DC8-8285-9DFF7E93BC4B} = {0544806B-3BB4-43CF-8277-BC612F32208D}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {84B77761-6B9E-46BA-B132-6C77B0B6E4FA}
Expand Down
15 changes: 12 additions & 3 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
// logger.ZLogInformation($"{x.TrackingId,3}: {Environment.NewLine}{x.StackTrace.Replace("R2.", "").Replace("C:\\MyGit\\R2\\sandbox\\ConsoleApp1\\", "").Replace("C:\\MyGit\\R2\\src\\R2\\", "")}");


logger.ZLogInformation($"{x.TrackingId,3}: {x.FormattedType}");
// logger.ZLogInformation($"{x.TrackingId,3}: {x.FormattedType}");
});

publisher.PublishOnNext(1);
Expand All @@ -51,17 +51,26 @@



var s = new System.Reactive.Subjects.Subject<string>();


// s.Where(

// new Result<int>(








foreach (var item in typeof(System.Reactive.Linq.Observable).GetMethods().Select(x => x.Name).Distinct().OrderBy(x => x))
{
if (item == "ToString" || item == "Equals" || item == "GetHashCode" || item == "GetType")
{
continue;
}
Console.WriteLine("- [ ] " + item);
}



Expand Down
4 changes: 2 additions & 2 deletions src/R3/Event.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ public void Dispose()
{
if (!SourceSubscription.IsDisposed)
{
DisposeCore();
SourceSubscription.Dispose();
DisposeCore(); // Dispose self
SourceSubscription.Dispose(); // Dispose attached parent
}
}
}
41 changes: 34 additions & 7 deletions src/R3/EventFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,44 @@ namespace R3;

public static partial class EventFactory
{
public static CompletableEvent<int, Unit> Range(int start, int count)
{
return new Range(start, count);
}

public static CompletableEvent<TMessage, Unit> ToEvent<TMessage>(this IEnumerable<TMessage> source)
{
return new EnumerableToEvent<TMessage>(source);
}

public static CompletableEvent<long, Unit> Timer(TimeSpan dueTime, TimeProvider timeProvider)
public static CompletableEvent<Unit, Unit> Timer(TimeSpan dueTime, TimeProvider timeProvider)
{
return new Timer(dueTime, timeProvider);
}
}

internal sealed class Range : CompletableEvent<int, Unit>
{
readonly int start;
readonly int count;

public Range(int start, int count)
{
this.start = start;
this.count = count;
}

protected override IDisposable SubscribeCore(Subscriber<int, Unit> subscriber)
{
for (int i = 0; i < count; i++)
{
subscriber.OnNext(start + i);
}
subscriber.OnCompleted(default);
return Disposable.Empty;
}
}

internal class EnumerableToEvent<TMessage>(IEnumerable<TMessage> source) : CompletableEvent<TMessage, Unit>
{
protected override IDisposable SubscribeCore(Subscriber<TMessage, Unit> subscriber)
Expand All @@ -28,7 +55,7 @@ protected override IDisposable SubscribeCore(Subscriber<TMessage, Unit> subscrib
}
}

internal class Timer : CompletableEvent<long, Unit>
internal sealed class Timer : CompletableEvent<Unit, Unit>
{
readonly TimeSpan dueTime;
readonly TimeProvider timeProvider;
Expand All @@ -39,19 +66,19 @@ public Timer(TimeSpan dueTime, TimeProvider timeProvider)
this.timeProvider = timeProvider;
}

protected override IDisposable SubscribeCore(Subscriber<long, Unit> subscriber)
protected override IDisposable SubscribeCore(Subscriber<Unit, Unit> subscriber)
{
var method = new _Timer(subscriber);
method.Timer = timeProvider.CreateStoppedTimer(_Timer.timerCallback, method);
method.Timer.InvokeOnce(dueTime);
return method;
}

sealed class _Timer(Subscriber<long, Unit> subscriber) : IDisposable
sealed class _Timer(Subscriber<Unit, Unit> subscriber) : IDisposable
{
public static readonly TimerCallback timerCallback = NextTick;

Subscriber<long, Unit> subscriber = subscriber;
Subscriber<Unit, Unit> subscriber = subscriber;

public ITimer? Timer { get; set; }

Expand All @@ -60,8 +87,8 @@ static void NextTick(object? state)
var self = (_Timer)state!;
try
{
self.subscriber.OnNext(0);
self.subscriber.OnCompleted(Unit.Default);
self.subscriber.OnNext(default);
self.subscriber.OnCompleted();
}
finally
{
Expand Down
140 changes: 72 additions & 68 deletions src/R3/Operators/CombineLatest.cs
Original file line number Diff line number Diff line change
@@ -1,103 +1,107 @@
namespace R3;

public static partial class EventExtensions
namespace R3
{
public static Event<TResult> CombineLatest<TLeft, TRight, TResult>(this Event<TLeft> left, Event<TRight> right, Func<TLeft, TRight, TResult> selector)
public static partial class EventExtensions
{
return new CombineLatest<TLeft, TRight, TResult>(left, right, selector);
public static Event<TResult> CombineLatest<TLeft, TRight, TResult>(this Event<TLeft> left, Event<TRight> right, Func<TLeft, TRight, TResult> selector)
{
return new CombineLatest<TLeft, TRight, TResult>(left, right, selector);
}
}
}

internal sealed class CombineLatest<TLeft, TRight, TResult>(Event<TLeft> left, Event<TRight> right, Func<TLeft, TRight, TResult> selector) : Event<TResult>
namespace R3.Operators
{
protected override IDisposable SubscribeCore(Subscriber<TResult> subscriber)
internal sealed class CombineLatest<TLeft, TRight, TResult>(Event<TLeft> left, Event<TRight> right, Func<TLeft, TRight, TResult> selector) : Event<TResult>
{
var method = new _CombineLatest(subscriber, selector);

var d1 = left.Subscribe(new LeftSubscriber(method));
try
protected override IDisposable SubscribeCore(Subscriber<TResult> subscriber)
{
var d2 = right.Subscribe(new RightSubscriber(method));
return Disposable.Combine(d1, d2);
}
catch
{
d1.Dispose();
throw;
}
}
var method = new _CombineLatest(subscriber, selector);

class _CombineLatest(Subscriber<TResult> subscriber, Func<TLeft, TRight, TResult> selector)
{
TLeft? message1;
bool hasMessage1;
TRight? message2;
bool hasMessage2;
var d1 = left.Subscribe(new LeftSubscriber(method));
try
{
var d2 = right.Subscribe(new RightSubscriber(method));
return Disposable.Combine(d1, d2);
}
catch
{
d1.Dispose();
throw;
}
}

public void OnNext(TLeft message)
class _CombineLatest(Subscriber<TResult> subscriber, Func<TLeft, TRight, TResult> selector)
{
var canPublish = false;
TRight? msg2 = default;
lock (this)
TLeft? message1;
bool hasMessage1;
TRight? message2;
bool hasMessage2;

public void OnNext(TLeft message)
{
hasMessage1 = true;
message1 = message;
var canPublish = false;
TRight? msg2 = default;
lock (this)
{
hasMessage1 = true;
message1 = message;

if (hasMessage2)
{
canPublish = true;
msg2 = message2;
}
}

if (hasMessage2)
if (canPublish)
{
canPublish = true;
msg2 = message2;
Publish(message, msg2!);
}
}

if (canPublish)
public void OnNext(TRight message)
{
Publish(message, msg2!);
}
}
var canPublish = false;
TLeft? msg1 = default;
lock (this)
{
hasMessage2 = true;
message2 = message;

public void OnNext(TRight message)
{
var canPublish = false;
TLeft? msg1 = default;
lock (this)
{
hasMessage2 = true;
message2 = message;
if (hasMessage1)
{
canPublish = true;
msg1 = message1;
}
}

if (hasMessage1)
if (canPublish)
{
canPublish = true;
msg1 = message1;
Publish(msg1!, message);
}
}

if (canPublish)
void Publish(TLeft m1, TRight m2)
{
Publish(msg1!, message);
var result = selector(m1, m2);
subscriber.OnNext(result);
}
}

void Publish(TLeft m1, TRight m2)
sealed class LeftSubscriber(_CombineLatest parent) : Subscriber<TLeft>
{
var result = selector(m1, m2);
subscriber.OnNext(result);
}
}

sealed class LeftSubscriber(_CombineLatest parent) : Subscriber<TLeft>
{
public override void OnNext(TLeft message)
{
parent.OnNext(message);
public override void OnNext(TLeft message)
{
parent.OnNext(message);
}
}
}

sealed class RightSubscriber(_CombineLatest parent) : Subscriber<TRight>
{
public override void OnNext(TRight message)
sealed class RightSubscriber(_CombineLatest parent) : Subscriber<TRight>
{
parent.OnNext(message);
public override void OnNext(TRight message)
{
parent.OnNext(message);
}
}
}
}
Loading

0 comments on commit f9360d8

Please sign in to comment.