Skip to content

Commit

Permalink
improve LiveList
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Dec 12, 2023
1 parent 83f3eb6 commit bd2a01c
Showing 1 changed file with 104 additions and 10 deletions.
114 changes: 104 additions & 10 deletions src/R3/LiveList.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.InteropServices;

namespace R3;

Expand Down Expand Up @@ -28,19 +29,23 @@ public static LiveList<TMessage, TComplete> ToLiveList<TMessage, TComplete>(this

public sealed class LiveList<T> : IReadOnlyList<T>, IDisposable
{
readonly RingBuffer<T> list = new RingBuffer<T>();
readonly IReadOnlyList<T> list; // RingBuffer<T> or List<T>
readonly IDisposable sourceSubscription;
readonly int bufferSize;

public LiveList(Event<T> source)
: this(source, -1)
{
if (bufferSize == 0) bufferSize = 1;
this.bufferSize = -1;
this.list = new List<T>();
this.sourceSubscription = source.Subscribe(new ListSubscriber(this));
}

public LiveList(Event<T> source, int bufferSize)
{
if (bufferSize == 0) bufferSize = 1;
this.bufferSize = bufferSize; // bufferSize must set before Subscribe(sometimes Subscribe run immediately)
this.list = new RingBuffer<T>(bufferSize);
this.sourceSubscription = source.Subscribe(new ListSubscriber(this));
}

Expand All @@ -66,6 +71,14 @@ public int Count
}
}

public void Clear()
{
lock (list)
{
list.Clear();
}
}

public void Dispose()
{
sourceSubscription.Dispose();
Expand Down Expand Up @@ -127,11 +140,20 @@ protected override void OnNextCore(T message)
{
lock (parent.list)
{
if (parent.list.Count == parent.bufferSize)
if (parent.bufferSize == -1)
{
((List<T>)parent.list).Add(message);
}
else
{
parent.list.RemoveFirst();
var ring = (RingBuffer<T>)parent.list;

if (ring.Count == parent.bufferSize)
{
ring.RemoveFirst();
}
ring.AddLast(message);
}
parent.list.AddLast(message);
}
}

Expand All @@ -144,7 +166,7 @@ protected override void OnErrorResumeCore(Exception error)

public sealed class LiveList<T, TComplete> : IReadOnlyList<T>, IDisposable
{
readonly RingBuffer<T> list = new RingBuffer<T>();
readonly IReadOnlyList<T> list; // RingBuffer<T> or List<T>
readonly IDisposable sourceSubscription;
readonly int bufferSize;

Expand All @@ -157,14 +179,18 @@ public sealed class LiveList<T, TComplete> : IReadOnlyList<T>, IDisposable
public TComplete? CompletedValue => completedValue;

public LiveList(CompletableEvent<T, TComplete> source)
: this(source, -1)
{
if (bufferSize == 0) bufferSize = 1;
this.bufferSize = -1;
this.list = new List<T>();
this.sourceSubscription = source.Subscribe(new ListSubscriber(this));
}

public LiveList(CompletableEvent<T, TComplete> source, int bufferSize)
{
if (bufferSize == 0) bufferSize = 1;
this.bufferSize = bufferSize; // bufferSize must set before Subscribe(sometimes Subscribe run immediately)
this.list = new RingBuffer<T>(bufferSize);
this.sourceSubscription = source.Subscribe(new ListSubscriber(this));
}

Expand All @@ -190,6 +216,14 @@ public int Count
}
}

public void Clear()
{
lock (list)
{
list.Clear();
}
}

public void Dispose()
{
sourceSubscription.Dispose();
Expand Down Expand Up @@ -251,12 +285,20 @@ protected override void OnNextCore(T message)
{
lock (parent.list)
{
if (parent.list.Count == parent.bufferSize)
if (parent.bufferSize == -1)
{
parent.list.RemoveFirst();
((List<T>)parent.list).Add(message);
}
else
{
var ring = (RingBuffer<T>)parent.list;

if (ring.Count == parent.bufferSize)
{
ring.RemoveFirst();
}
ring.AddLast(message);
}
parent.list.AddLast(message);
}
}

Expand All @@ -275,3 +317,55 @@ protected override void OnCompletedCore(TComplete complete)
}
}
}

file static class RingBufferOrListExtensions
{
public static RingBufferSpan<T> GetSpan<T>(this IReadOnlyList<T> list)
{
if (list is RingBuffer<T> r)
{
return r.GetSpan();
}
else if (list is List<T> l)
{
var span1 = CollectionsMarshal.AsSpan(l);
return new RingBufferSpan<T>(span1, default, span1.Length);
}
else
{
throw new NotSupportedException();
}
}

public static void Clear<T>(this IReadOnlyList<T> list)
{
if (list is RingBuffer<T> r)
{
r.Clear();
}
else if (list is List<T> l)
{
l.Clear();
}
else
{
throw new NotSupportedException();
}
}

public static T[] ToArray<T>(this IReadOnlyList<T> list)
{
if (list is RingBuffer<T> r)
{
return r.ToArray();
}
else if (list is List<T> l)
{
return CollectionsMarshal.AsSpan(l).ToArray();
}
else
{
throw new NotSupportedException();
}
}
}

0 comments on commit bd2a01c

Please sign in to comment.