Skip to content

Commit

Permalink
ReplayFrameSubject, ReplayFrame
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Jan 3, 2024
1 parent 3cc6fce commit a7bfb9d
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 8 deletions.
22 changes: 20 additions & 2 deletions src/R3/Operators/Multicast.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

public static partial class ObservableExtensions
{
// Multicast, Publish, Replay, Share
// Multicast, Publish, Replay, ReplayFrame, Share

public static ConnectableObservable<T> Multicast<T>(this Observable<T> source, ISubject<T> subject)
{
Expand Down Expand Up @@ -49,7 +49,25 @@ public static ConnectableObservable<T> Replay<T>(this Observable<T> source, int
return source.Multicast(new ReplaySubject<T>(bufferSize, window, timeProvider));
}

// TODO: ReplayFrame
public static ConnectableObservable<T> ReplayFrame<T>(this Observable<T> source, int window)
{
return source.Multicast(new ReplayFrameSubject<T>(window));
}

public static ConnectableObservable<T> ReplayFrame<T>(this Observable<T> source, int window, FrameProvider frameProvider)
{
return source.Multicast(new ReplayFrameSubject<T>(window, frameProvider));
}

public static ConnectableObservable<T> ReplayFrame<T>(this Observable<T> source, int bufferSize, int window)
{
return source.Multicast(new ReplayFrameSubject<T>(bufferSize, window));
}

public static ConnectableObservable<T> ReplayFrame<T>(this Observable<T> source, int bufferSize, int window, FrameProvider frameProvider)
{
return source.Multicast(new ReplayFrameSubject<T>(bufferSize, window, frameProvider));
}

public static Observable<T> Share<T>(this Observable<T> source)
{
Expand Down
3 changes: 3 additions & 0 deletions src/R3/Operators/_Operators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ public static partial class ObservableExtensions

// TODO: this is working space, will remove this file after complete.

// Timebased
// ReplayFrameSubject, ReplayFrame

// Rx Merging:
// CombineLatest, Zip, WithLatestFrom, ZipLatest, Switch, Pairwise

Expand Down
192 changes: 192 additions & 0 deletions src/R3/ReplayFrameSubject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
namespace R3;

public sealed class ReplayFrameSubject<T> : Observable<T>, ISubject<T>, IDisposable
{
readonly int bufferSize;
readonly int window;
readonly FrameProvider frameProvider;
readonly RingBuffer<(long timestamp, T value)> replayBuffer; // lock object

// Subject
FreeListCore<Subscription> list;
CompleteState completeState;

public ReplayFrameSubject(int window)
: this(int.MaxValue, int.MaxValue, ObservableSystem.DefaultFrameProvider)
{
}

public ReplayFrameSubject(int window, FrameProvider frameProvider)
: this(int.MaxValue, window, frameProvider)
{
}

public ReplayFrameSubject(int bufferSize, int window)
: this(bufferSize, window, ObservableSystem.DefaultFrameProvider)
{
}

// full constructor
public ReplayFrameSubject(int bufferSize, int window, FrameProvider frameProvider)
{
this.bufferSize = bufferSize;
this.window = window;
this.frameProvider = frameProvider;
this.replayBuffer = new RingBuffer<(long, T)>(bufferSize < 8 ? bufferSize : 8);
this.list = new FreeListCore<Subscription>(replayBuffer);
}

public bool IsDisposed => completeState.IsDisposed;

public void OnNext(T value)
{
if (completeState.IsCompleted) return;

lock (replayBuffer)
{
Trim();
replayBuffer.AddLast((frameProvider?.GetFrameCount() ?? 0, value));
}

foreach (var subscription in list.AsSpan())
{
subscription?.observer.OnNext(value);
}
}

public void OnErrorResume(Exception error)
{
if (completeState.IsCompleted) return;

foreach (var subscription in list.AsSpan())
{
subscription?.observer.OnErrorResume(error);
}
}

public void OnCompleted(Result result)
{
var status = completeState.TrySetResult(result);
if (status != CompleteState.ResultStatus.Done)
{
return; // already completed
}

foreach (var subscription in list.AsSpan())
{
subscription?.observer.OnCompleted(result);
}
}

protected override IDisposable SubscribeCore(Observer<T> observer)
{
// raise latest value on subscribe(before check completed add observer to list)
lock (replayBuffer)
{
Trim(); // Trim before get span
var dualSpan = replayBuffer.GetSpan();
foreach (ref readonly var item in dualSpan.First)
{
observer.OnNext(item.value);
}
foreach (ref readonly var item in dualSpan.Second)
{
observer.OnNext(item.value);
}
}

var result = completeState.TryGetResult();
if (result != null)
{
observer.OnCompleted(result.Value);
return Disposable.Empty;
}

var subscription = new Subscription(this, observer); // create subscription and add observer to list.

// need to check called completed during adding
result = completeState.TryGetResult();
if (result != null)
{
subscription.observer.OnCompleted(result.Value);
subscription.Dispose();
return Disposable.Empty;
}

return subscription;
}

public void Dispose()
{
Dispose(true);
}

public void Dispose(bool callOnCompleted)
{
if (completeState.TrySetDisposed(out var alreadyCompleted))
{
if (callOnCompleted && !alreadyCompleted)
{
// not yet disposed so can call list iteration
foreach (var subscription in list.AsSpan())
{
subscription?.observer.OnCompleted();
}
}

list.Dispose();
lock (replayBuffer)
{
replayBuffer.Clear();
}
}
}

void Trim()
{
// Trim by Count
while (replayBuffer.Count > bufferSize)
{
replayBuffer.RemoveFirst();
}

// Trim by Time
var now = frameProvider.GetFrameCount();
while (replayBuffer.Count > 0)
{
var value = replayBuffer[0]; // peek first
var elapsed = now - value.timestamp;
if (elapsed >= window)
{
replayBuffer.RemoveFirst();
}
else
{
break;
}
}
}

sealed class Subscription : IDisposable
{
public readonly Observer<T> observer;
readonly int removeKey;
ReplayFrameSubject<T>? parent;

public Subscription(ReplayFrameSubject<T> parent, Observer<T> observer)
{
this.parent = parent;
this.observer = observer;
parent.list.Add(this, out removeKey); // for the thread-safety, add and set removeKey in same lock.
}

public void Dispose()
{
var p = Interlocked.Exchange(ref parent, null);
if (p == null) return;

// removeKey is index, will reuse if remove completed so only allows to call from here and must not call twice.
p.list.Remove(removeKey);
}
}
}
6 changes: 0 additions & 6 deletions src/R3/ReplaySubject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,3 @@ public void Dispose()
}
}
}

// TODO: ReplayFrameSubject?
// full constructor 2?
//public ReplaySubject(int bufferSize, int windowFrame, FrameProvider frameProvider)
//{
//}
42 changes: 42 additions & 0 deletions tests/R3.Tests/ReplayFrameSubjectTest .cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
namespace R3.Tests;

public class ReplayFrameSubjectTest
{
[Fact]
public void ReplayTime()
{
var fakeTime = new ManualFrameProvider();

var subject = new ReplayFrameSubject<int>((3), fakeTime);

subject.OnNext(10);
fakeTime.Advance((1));
subject.ToLiveList().AssertEqual([10]);

subject.OnNext(20);
fakeTime.Advance((1));
subject.ToLiveList().AssertEqual([10, 20]);

subject.OnNext(30);
fakeTime.Advance((1));

var list = subject.ToLiveList();
subject.ToLiveList().AssertEqual([20, 30]);

subject.OnNext(40);
subject.OnNext(50);
subject.OnNext(60);

fakeTime.Advance((2));
subject.OnNext(70);

subject.ToLiveList().AssertEqual([40, 50, 60, 70]);
fakeTime.Advance((1));

subject.ToLiveList().AssertEqual([70]);

subject.OnCompleted();
subject.ToLiveList().AssertEqual([70]);
subject.ToLiveList().AssertIsCompleted();
}
}

0 comments on commit a7bfb9d

Please sign in to comment.