diff --git a/src/R3/LiveList.cs b/src/R3/LiveList.cs index 02a2fe5a..9e0282c5 100644 --- a/src/R3/LiveList.cs +++ b/src/R3/LiveList.cs @@ -1,5 +1,6 @@ using System.Collections; using System.Diagnostics.CodeAnalysis; +using System.Runtime.InteropServices; namespace R3; @@ -28,19 +29,23 @@ public static LiveList ToLiveList(this public sealed class LiveList : IReadOnlyList, IDisposable { - readonly RingBuffer list = new RingBuffer(); + readonly IReadOnlyList list; // RingBuffer or List readonly IDisposable sourceSubscription; readonly int bufferSize; public LiveList(Event source) - : this(source, -1) { + if (bufferSize == 0) bufferSize = 1; + this.bufferSize = -1; + this.list = new List(); + this.sourceSubscription = source.Subscribe(new ListSubscriber(this)); } public LiveList(Event source, int bufferSize) { if (bufferSize == 0) bufferSize = 1; this.bufferSize = bufferSize; // bufferSize must set before Subscribe(sometimes Subscribe run immediately) + this.list = new RingBuffer(bufferSize); this.sourceSubscription = source.Subscribe(new ListSubscriber(this)); } @@ -66,6 +71,14 @@ public int Count } } + public void Clear() + { + lock (list) + { + list.Clear(); + } + } + public void Dispose() { sourceSubscription.Dispose(); @@ -127,11 +140,20 @@ protected override void OnNextCore(T message) { lock (parent.list) { - if (parent.list.Count == parent.bufferSize) + if (parent.bufferSize == -1) + { + ((List)parent.list).Add(message); + } + else { - parent.list.RemoveFirst(); + var ring = (RingBuffer)parent.list; + + if (ring.Count == parent.bufferSize) + { + ring.RemoveFirst(); + } + ring.AddLast(message); } - parent.list.AddLast(message); } } @@ -144,7 +166,7 @@ protected override void OnErrorResumeCore(Exception error) public sealed class LiveList : IReadOnlyList, IDisposable { - readonly RingBuffer list = new RingBuffer(); + readonly IReadOnlyList list; // RingBuffer or List readonly IDisposable sourceSubscription; readonly int bufferSize; @@ -157,14 +179,18 @@ public sealed class LiveList : IReadOnlyList, IDisposable public TComplete? CompletedValue => completedValue; public LiveList(CompletableEvent source) - : this(source, -1) { + if (bufferSize == 0) bufferSize = 1; + this.bufferSize = -1; + this.list = new List(); + this.sourceSubscription = source.Subscribe(new ListSubscriber(this)); } public LiveList(CompletableEvent source, int bufferSize) { if (bufferSize == 0) bufferSize = 1; this.bufferSize = bufferSize; // bufferSize must set before Subscribe(sometimes Subscribe run immediately) + this.list = new RingBuffer(bufferSize); this.sourceSubscription = source.Subscribe(new ListSubscriber(this)); } @@ -190,6 +216,14 @@ public int Count } } + public void Clear() + { + lock (list) + { + list.Clear(); + } + } + public void Dispose() { sourceSubscription.Dispose(); @@ -251,12 +285,20 @@ protected override void OnNextCore(T message) { lock (parent.list) { - if (parent.list.Count == parent.bufferSize) + if (parent.bufferSize == -1) { - parent.list.RemoveFirst(); + ((List)parent.list).Add(message); + } + else + { + var ring = (RingBuffer)parent.list; + if (ring.Count == parent.bufferSize) + { + ring.RemoveFirst(); + } + ring.AddLast(message); } - parent.list.AddLast(message); } } @@ -275,3 +317,55 @@ protected override void OnCompletedCore(TComplete complete) } } } + +file static class RingBufferOrListExtensions +{ + public static RingBufferSpan GetSpan(this IReadOnlyList list) + { + if (list is RingBuffer r) + { + return r.GetSpan(); + } + else if (list is List l) + { + var span1 = CollectionsMarshal.AsSpan(l); + return new RingBufferSpan(span1, default, span1.Length); + } + else + { + throw new NotSupportedException(); + } + } + + public static void Clear(this IReadOnlyList list) + { + if (list is RingBuffer r) + { + r.Clear(); + } + else if (list is List l) + { + l.Clear(); + } + else + { + throw new NotSupportedException(); + } + } + + public static T[] ToArray(this IReadOnlyList list) + { + if (list is RingBuffer r) + { + return r.ToArray(); + } + else if (list is List l) + { + return CollectionsMarshal.AsSpan(l).ToArray(); + } + else + { + throw new NotSupportedException(); + } + } +}