From 0b9d7f736f34314f3d8da34994fb646731ed00b9 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 20 Jun 2023 11:32:54 -0400 Subject: [PATCH 1/7] chore: Improve debug by adding few ToString() --- src/Uno.Extensions.Reactive/Core/Axes/MessageAxis.cs | 7 +++++++ .../Core/Axes/MessageAxisValue.cs | 9 +++++++++ .../Core/Internal/MessageAxisUpdate.cs | 4 ++++ 3 files changed, 20 insertions(+) diff --git a/src/Uno.Extensions.Reactive/Core/Axes/MessageAxis.cs b/src/Uno.Extensions.Reactive/Core/Axes/MessageAxis.cs index 0cfbb52a3e..6809fe2c82 100644 --- a/src/Uno.Extensions.Reactive/Core/Axes/MessageAxis.cs +++ b/src/Uno.Extensions.Reactive/Core/Axes/MessageAxis.cs @@ -143,4 +143,11 @@ private static bool Equals(MessageAxis left, MessageAxis right) [Pure] public static bool operator !=(MessageAxis left, MessageAxis right) => !Equals(left, right); + + /// + [Pure] + public override string ToString() + => IsTransient + ? Identifier + "~" + : Identifier; } diff --git a/src/Uno.Extensions.Reactive/Core/Axes/MessageAxisValue.cs b/src/Uno.Extensions.Reactive/Core/Axes/MessageAxisValue.cs index 804af8a3d0..cfe6d8df41 100644 --- a/src/Uno.Extensions.Reactive/Core/Axes/MessageAxisValue.cs +++ b/src/Uno.Extensions.Reactive/Core/Axes/MessageAxisValue.cs @@ -43,4 +43,13 @@ public void Deconstruct(out bool isSet, out object? value) isSet = IsSet; value = Value; } + + /// + public override string ToString() + => (IsSet, Value) switch + { + (false, _) => "--unset--", + (true, null) => "--null--", + (_, var v) => v.ToString() + }; } diff --git a/src/Uno.Extensions.Reactive/Core/Internal/MessageAxisUpdate.cs b/src/Uno.Extensions.Reactive/Core/Internal/MessageAxisUpdate.cs index 73a8f2c118..6815b5df17 100644 --- a/src/Uno.Extensions.Reactive/Core/Internal/MessageAxisUpdate.cs +++ b/src/Uno.Extensions.Reactive/Core/Internal/MessageAxisUpdate.cs @@ -20,4 +20,8 @@ public MessageAxisUpdate(MessageAxis axis, MessageAxisValue value, IChangeSet? c public (MessageAxisValue value, IChangeSet? changes) GetValue(MessageAxisValue parent, MessageAxisValue current) => Axis.GetLocalValue(parent, current, (Value, Changes)); + + /// + public override string ToString() + => $"{Axis} = {Value} {(Changes is not null ? "(with details)" : "")}"; } From 69891fe4d5b5d4598f1adbf504a2e12b3ed8cba3 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 20 Jun 2023 11:35:33 -0400 Subject: [PATCH 2/7] feat: Update the internal MessageManager to not produce an update if nothing has been updated Currenlty if nothig has changed, it would still remove all IsTransient axes. --- .../Core/Given_MessageBuilder.cs | 12 +++++----- .../Core/Internal/MessageManager.cs | 21 ++++++++++------ .../Core/MessageBuilder.TParent.cs | 24 +++++++++---------- 3 files changed, 32 insertions(+), 25 deletions(-) diff --git a/src/Uno.Extensions.Reactive.Tests/Core/Given_MessageBuilder.cs b/src/Uno.Extensions.Reactive.Tests/Core/Given_MessageBuilder.cs index 56d4e65cb3..4008e8180f 100644 --- a/src/Uno.Extensions.Reactive.Tests/Core/Given_MessageBuilder.cs +++ b/src/Uno.Extensions.Reactive.Tests/Core/Given_MessageBuilder.cs @@ -34,12 +34,12 @@ public async Task When_TransientAxis_Then_Dismissed_OfT() var original = manager.Current; // Note: Even if we don't set any axis in Update, the Current message should be updated only by the fact that the transient axis has been automatically removed - MessageBuilder builder = default; + MessageBuilder? builder = default; manager.Update(current => builder = current.With(), CT); var updated = manager.Current; original.Current[myAxis].IsSet.Should().BeTrue(); - builder.Get(myAxis).value.IsSet.Should().BeFalse("transient axis should have been cleared at beginning to reflect resulting state"); + builder!.Get(myAxis).value.IsSet.Should().BeFalse("transient axis should have been cleared at beginning to reflect resulting state"); updated.Current[myAxis].IsSet.Should().BeFalse("transient axis should have been cleared on update"); } @@ -53,12 +53,12 @@ public async Task When_TransientAxisByParent_Then_NotDismissed_OfT() var original = manager.Current; // Note: Even if we don't set any axis in Update, the Current message should be updated only by the fact that the transient axis has been automatically removed - MessageBuilder builder = default; + MessageBuilder? builder = default; manager.Update(current => builder = current.With(), CT); var updated = manager.Current; original.Current[myAxis].IsSet.Should().BeTrue(); - builder.Get(myAxis).value.IsSet.Should().BeTrue("transient axis should have been kept as it was defined on parent"); + builder!.Get(myAxis).value.IsSet.Should().BeTrue("transient axis should have been kept as it was defined on parent"); updated.Current[myAxis].IsSet.Should().BeTrue("transient axis should have been kept as it was defined on parent"); } @@ -72,12 +72,12 @@ public async Task When_TransientAxisByParentAndLocally_Then_NotDismissed_OfT() var original = manager.Current; // Note: Even if we don't set any axis in Update, the Current message should be updated only by the fact that the transient axis has been automatically removed - MessageBuilder builder = default; + MessageBuilder? builder = default; manager.Update(current => builder = current.With(), CT); var updated = manager.Current; original.Current[myAxis].IsSet.Should().BeTrue(); - builder.Get(myAxis).value.IsSet.Should().BeTrue("transient axis should have been kept as it was also defined on parent"); + builder!.Get(myAxis).value.IsSet.Should().BeTrue("transient axis should have been kept as it was also defined on parent"); updated.Current[myAxis].IsSet.Should().BeTrue("transient axis should have been kept as it was also defined on parent"); } diff --git a/src/Uno.Extensions.Reactive/Core/Internal/MessageManager.cs b/src/Uno.Extensions.Reactive/Core/Internal/MessageManager.cs index 77e1fd9a53..3f27138a6d 100644 --- a/src/Uno.Extensions.Reactive/Core/Internal/MessageManager.cs +++ b/src/Uno.Extensions.Reactive/Core/Internal/MessageManager.cs @@ -30,6 +30,7 @@ internal partial class MessageManager public Message Current => _local.result; // Locally, we only store a set of delegates that are upgrading the parent value into a local value. + // Compared to the 'defined' the 'applied' also contains transient updates of a _pendingUpdate transaction. private (_ChangeSet defined, _ChangeSet applied, Message result) _local; private bool _isFirstUpdate = true; @@ -61,21 +62,27 @@ public bool Update(Updater updater, TState state, CancellationTo lock (_gate) { - var (parent, locallyDefinedChangeSet) = updater(new CurrentMessage(this), state).GetResult(); + var (parent, hasUpdatedDefinedChangeSet, updatedDefinedChangeSet) = updater(new CurrentMessage(this), state).GetResult(); if (ct.IsCancellationRequested) { return false; } + // If nothing has been changed, so we prefer to use the previous defined change-set in order to avoid to just remove 'IsTransient' axes. + if (!hasUpdatedDefinedChangeSet) + { + updatedDefinedChangeSet = _local.defined; + } + // If we have any pending update transaction, we make sure to append its change set to the locally defined - var changeSetToApply = _pendingUpdate?.TransientUpdates is { Count: > 0 } transientUpdates - ? locallyDefinedChangeSet.ToDictionary().SetItems(transientUpdates) - : locallyDefinedChangeSet; + var updatedAppliedChangeSet = _pendingUpdate?.TransientUpdates is { Count: > 0 } transientUpdates + ? updatedDefinedChangeSet.ToDictionary().SetItems(transientUpdates) + : updatedDefinedChangeSet; // Finally apply the updates in order to get the new Local // Note: We append the _local.applied.Keys as if a transaction was removed, it's possible that some changes was removed - var possiblyChangedAxes = changeSetToApply.Keys.Concat(_local.applied.Keys); + var possiblyChangedAxes = updatedAppliedChangeSet.Keys.Concat(_local.applied.Keys); if (parent is not null && parent != _parent) // Note: parent should not be null if updated !!! { possiblyChangedAxes = possiblyChangedAxes.Concat(parent.Changes); @@ -95,7 +102,7 @@ public bool Update(Updater updater, TState state, CancellationTo // either the change is coming from the parent. // In all case we just need to propagate the value from the parent. var updated = (value: parentValue, changes: default(IChangeSet?)); - if (changeSetToApply.TryGetValue(axis, out var update)) + if (updatedAppliedChangeSet.TryGetValue(axis, out var update)) { updated = update.GetValue(parentValue, currentValue); } @@ -130,7 +137,7 @@ public bool Update(Updater updater, TState state, CancellationTo } _isFirstUpdate = false; - _local = (locallyDefinedChangeSet, changeSetToApply, new Message(Current.Current, new MessageEntry(values), changes)); + _local = (updatedDefinedChangeSet, updatedAppliedChangeSet, new Message(Current.Current, new MessageEntry(values), changes)); _send?.Invoke(Current); return true; } diff --git a/src/Uno.Extensions.Reactive/Core/MessageBuilder.TParent.cs b/src/Uno.Extensions.Reactive/Core/MessageBuilder.TParent.cs index 8d4f8d56ed..c12fc790ce 100644 --- a/src/Uno.Extensions.Reactive/Core/MessageBuilder.TParent.cs +++ b/src/Uno.Extensions.Reactive/Core/MessageBuilder.TParent.cs @@ -11,9 +11,10 @@ namespace Uno.Extensions.Reactive; /// /// Type of the value of the parent message. /// The type of the value of the message to build. -public readonly struct MessageBuilder : IMessageEntry, IMessageBuilder, IMessageBuilder +public sealed class MessageBuilder : IMessageEntry, IMessageBuilder, IMessageBuilder { private readonly Dictionary _updates; + private bool _hasUpdates; // This allows us to easily determine if we have changes no matter if we removed axis axises flagged has IsTransient. /// /// Creates a new message builder, including some changes (a.k.a. updates) that was previously made on the local message. @@ -28,8 +29,9 @@ internal MessageBuilder( Local = local.value; // We make sure to clear all transient axes when we update a message - // Note: We remove only "local" values, parent values are still propagated, it's there responsibility to remove them. + // Note: We remove only "local" values, parent values are still propagated, it's their responsibility to remove them. _updates = local.updates.ToDictionaryWhereKey(k => !k.IsTransient); + // _hasUpdates = false => Removing only transient axes is not considered as a change! } /// @@ -43,6 +45,7 @@ internal MessageBuilder(Message? parent, Message local) Local = local; _updates = new(); + _hasUpdates = true; // When we drop the local changes, we should consider that we have changes. } /// @@ -58,8 +61,8 @@ internal MessageBuilder(Message? parent, Message local) /// /// The new set of updates that has been defined on this builder /// - internal (Message? parent, IReadOnlyDictionary updates) GetResult() - => (Parent, _updates); + internal (Message? parent, bool hasUpdates, IReadOnlyDictionary updates) GetResult() + => (Parent, _hasUpdates, _updates); Option IMessageEntry.Data => CurrentData; Exception? IMessageEntry.Error => CurrentError; @@ -77,14 +80,10 @@ internal MessageBuilder(Message? parent, Message local) { var parentValue = Parent?.Current[axis] ?? MessageAxisValue.Unset; var localValue = Local.Current[axis]; - if (_updates.TryGetValue(axis, out var updater)) - { - return updater.GetValue(parentValue, localValue); - } - else - { - return (parentValue, default); - } + + return _updates.TryGetValue(axis, out var updater) + ? updater.GetValue(parentValue, localValue) + : (parentValue, default); } /// @@ -95,6 +94,7 @@ internal MessageBuilder Set(MessageAxis axis, MessageAxisValue { // Note: We are not validating the axis.AreEquals as changes are detected by the MessageManager itself. _updates[axis] = new MessageAxisUpdate(axis, value, changes); + _hasUpdates = true; return this; } } From 447910415890789fa7dc03f6267a044bfd047b1b Mon Sep 17 00:00:00 2001 From: David Date: Tue, 20 Jun 2023 11:38:05 -0400 Subject: [PATCH 3/7] chore: Refactor state to always relay on the UpdateFeed State is now only a FeedSubscription holder of an UpdateFeed. --- .../Core/Internal/FeedSubscription.cs | 15 +- .../Core/Internal/IStateStore.cs | 9 +- .../Core/Internal/SourceContext.cs | 7 +- .../Core/Internal/StateImpl.cs | 260 +++++------------- .../Core/Internal/StateSubscriptionMode.cs | 28 -- .../Core/Internal/SubscriptionMode.cs | 31 +++ .../ReplayOneAsyncEnumerable.cs | 12 +- 7 files changed, 139 insertions(+), 223 deletions(-) delete mode 100644 src/Uno.Extensions.Reactive/Core/Internal/StateSubscriptionMode.cs create mode 100644 src/Uno.Extensions.Reactive/Core/Internal/SubscriptionMode.cs diff --git a/src/Uno.Extensions.Reactive/Core/Internal/FeedSubscription.cs b/src/Uno.Extensions.Reactive/Core/Internal/FeedSubscription.cs index f6c72498ea..7de6330517 100644 --- a/src/Uno.Extensions.Reactive/Core/Internal/FeedSubscription.cs +++ b/src/Uno.Extensions.Reactive/Core/Internal/FeedSubscription.cs @@ -26,6 +26,7 @@ internal class FeedSubscription internal class FeedSubscription : IAsyncDisposable, ISourceContextOwner { + private readonly ISignal> _feed; private readonly SourceContext _rootContext; private readonly CompositeRequestSource _requests = new(); private readonly SourceContext _context; @@ -33,6 +34,7 @@ internal class FeedSubscription : IAsyncDisposable, ISourceContextOwner public FeedSubscription(ISignal> feed, SourceContext rootContext) { + _feed = feed; _rootContext = rootContext; _context = rootContext.CreateChild(this, _requests); _source = new ReplayOneAsyncEnumerable>( @@ -40,14 +42,21 @@ public FeedSubscription(ISignal> feed, SourceContext rootContext) isInitialSyncValuesSkippingAllowed: FeedSubscription.IsInitialSyncValuesSkippingAllowed); } - string ISourceContextOwner.Name => $"Sub on {_source} for ctx '{_context.Parent!.Owner.Name}'."; + string ISourceContextOwner.Name => $"Sub on '{_feed}' for ctx '{_context.Parent!.Owner.Name}'."; IDispatcher? ISourceContextOwner.Dispatcher => null; + internal Message Current => _source.TryGetCurrent(out var value) ? value : Message.Initial; + + public IDisposable UpdateMode(SubscriptionMode mode) + { + // Not supported yet. + // Here we should compute the stricter mode + return Disposable.Empty; + } public async IAsyncEnumerable> GetMessages(SourceContext subscriberContext, [EnumeratorCancellation] CancellationToken ct) { - Debug.Assert(subscriberContext.RootId == _context.RootId); if (subscriberContext != _rootContext) { _requests.Add(subscriberContext.RequestSource, ct); @@ -83,6 +92,6 @@ public async ValueTask DisposeAsync() { await _context.DisposeAsync(); _requests.Dispose(); - _source.Dispose(); + await _source.DisposeAsync(); } } diff --git a/src/Uno.Extensions.Reactive/Core/Internal/IStateStore.cs b/src/Uno.Extensions.Reactive/Core/Internal/IStateStore.cs index db115cbc05..acee226395 100644 --- a/src/Uno.Extensions.Reactive/Core/Internal/IStateStore.cs +++ b/src/Uno.Extensions.Reactive/Core/Internal/IStateStore.cs @@ -4,8 +4,11 @@ namespace Uno.Extensions.Reactive.Core; /// -/// A cache of used by a . +/// A cache of and used by a . /// +/// +/// This is the class responsible to that hold the "state" (the generic term, i.e. a persistent value) of the subscriptions made by the owner on feeds. +/// internal interface IStateStore : IAsyncDisposable { /// @@ -37,6 +40,10 @@ FeedSubscription GetOrCreateSubscription(TSource source /// Factory to build the state is not present yet in the cache. /// The state wrapping the given feed /// This store has been disposed. + /// + /// If the the returned state makes any subscription to a feed, + /// it's expected that it will share that subscription with other subscribers of the current context (i.e. it uses the ). + /// TState GetOrCreateState(TSource source, Func factory) where TSource : class where TState : IState; diff --git a/src/Uno.Extensions.Reactive/Core/Internal/SourceContext.cs b/src/Uno.Extensions.Reactive/Core/Internal/SourceContext.cs index b5669107b2..7e0ee48769 100644 --- a/src/Uno.Extensions.Reactive/Core/Internal/SourceContext.cs +++ b/src/Uno.Extensions.Reactive/Core/Internal/SourceContext.cs @@ -33,6 +33,11 @@ public sealed class SourceContext : IAsyncDisposable private static readonly AsyncLocal _current = new(); private static readonly ConditionalWeakTable _contexts = new(); + /// + /// Gets the none context. + /// + internal static SourceContext None => _none; + /// /// Gets the current context. /// @@ -163,7 +168,7 @@ private SourceContext(SourceContext parent, ISourceContextOwner owner, IStateSto RootId = parent.RootId; Parent = parent; Owner = owner; - States = states ?? parent.States; + States = states ?? parent.States; // Note: A child StateStore should forward request to its parent store! RequestSource = requests ?? parent.RequestSource; } diff --git a/src/Uno.Extensions.Reactive/Core/Internal/StateImpl.cs b/src/Uno.Extensions.Reactive/Core/Internal/StateImpl.cs index 818ae14252..b0ffcf1bae 100644 --- a/src/Uno.Extensions.Reactive/Core/Internal/StateImpl.cs +++ b/src/Uno.Extensions.Reactive/Core/Internal/StateImpl.cs @@ -1,26 +1,19 @@ using System; -using System.Collections.Generic; using System.ComponentModel; -using System.Diagnostics.CodeAnalysis; using System.Linq; -using System.Runtime.CompilerServices; -using System.Threading; -using System.Threading.Tasks; using Uno.Extensions.Reactive.Operators; -using Uno.Extensions.Reactive.Utils; +using Uno.Extensions.Reactive.Sources; namespace Uno.Extensions.Reactive.Core; -internal sealed class StateImpl : IState, IFeed, IAsyncDisposable, IStateImpl, ISourceContextOwner +internal sealed class StateImpl : IState, IFeed, IAsyncDisposable, IStateImpl { - private readonly object _updateGate = new(); - private readonly UpdateFeed? _updates; - private readonly IForEachRunner? _innerEnumeration; - private readonly CompositeRequestSource _requests = new(); + private readonly SubscriptionMode _mode; + private readonly StateUpdateKind _updatesKind; + private readonly UpdateFeed _inner; - private bool _hasCurrent; - private Message _current = Message.Initial; - private TaskCompletionSource? _next = new(); // Do not use attached: we don't want to keep the creating context alive and leak this State + private FeedSubscription? _subscription; + private IDisposable? _subscriptionMode; /// /// Gets the context to which this state belongs. @@ -28,230 +21,123 @@ internal sealed class StateImpl : IState, IFeed, IAsyncDisposable, ISta public SourceContext Context { get; } SourceContext IStateImpl.Context => Context; - internal Message Current => _current; + internal Message Current => _subscription?.Current ?? Message.Initial; - string ISourceContextOwner.Name => $"State<{typeof(T).Name}> for ctx '{Context.Parent!.Owner.Name}'."; - - IDispatcher? ISourceContextOwner.Dispatcher => null; + /// + /// Gets direct access to the underlying UpdateFeed so we can have full control of update operation made on it. + /// + internal UpdateFeed Inner => _inner; - private readonly struct Node + /// + /// Legacy - Used only be legacy IInput syntax + /// + [EditorBrowsable(EditorBrowsableState.Never)] + public StateImpl(Option defaultValue) + : this(SourceContext.None, defaultValue) { - private readonly Message _value; - private readonly TaskCompletionSource _next; - - public Node(Message value, TaskCompletionSource next) - { - _value = value; - _next = next; - } + } - public void Deconstruct(out Message current, out TaskCompletionSource next) - { - current = _value; - next = _next; - } + public StateImpl(SourceContext context, Option defaultValue) + : this(context, new AsyncFeed(async _ => defaultValue), SubscriptionMode.Eager, StateUpdateKind.Persistent) + { } public StateImpl( SourceContext context, IFeed feed, - StateSubscriptionMode mode = StateSubscriptionMode.Default, + SubscriptionMode mode = SubscriptionMode.Default, StateUpdateKind updatesKind = StateUpdateKind.Volatile) { - Context = context.CreateChild(this, _requests); + Context = context; + + _mode = mode; + _updatesKind = updatesKind; + _inner = new UpdateFeed(feed); if (updatesKind is StateUpdateKind.Persistent) { - // Note: We expect to use the UpdateFeed for all kind of updates, but to avoid silent breaking changes, - // for now we use it only when using the new Persistent mode. - feed = _updates = new UpdateFeed(feed); - // If updates has to be persistent, the subscription to the UpdateFeed must remain active. - mode &= ~StateSubscriptionMode.RefCounted; + _mode &= ~SubscriptionMode.RefCounted; } - _innerEnumeration = mode.HasFlag(StateSubscriptionMode.RefCounted) - ? new RefCountedForEachRunner>(GetSource, UpdateState) - : new ForEachRunner>(GetSource, UpdateState); - if (mode.HasFlag(StateSubscriptionMode.Eager)) + if (mode.HasFlag(SubscriptionMode.Eager)) { - _innerEnumeration.Prefetch(); + // Note: Once the dynamic updates of the subscription mode in supported in FeedSubscription, + // we will be able to unconditionally create the _subscription and just push the mode on it instead! + Enable(); } - - IAsyncEnumerable> GetSource() - => feed.GetSource(Context); - - ValueTask UpdateState(Message newSrcMsg, CancellationToken ct) - // Note: When using the StateUpdateKind.Persistent, the newScrMsg is known to be the same as the currentStateMsg - // (as updates are made on the source 'feed' instead of locally), so this is equivalent to: - //=> UpdateCore(_ => newSrcMsg, ct); // Note2 : OverrideBy is optimized to handle that case! - => UpdateCore(currentStateMsg => currentStateMsg.OverrideBy(newSrcMsg), ct); } - public StateImpl(SourceContext context, Option defaultValue) + public IAsyncEnumerable> GetSource(SourceContext context, CancellationToken ct = default) { - Context = context?.CreateChild(this, _requests)!; // Null check override only for legacy IInput support + Enable(); - _hasCurrent = true; // Even if undefined, we consider that we do have a value in order to produce an initial state - if (!defaultValue.IsUndefined()) - { - _current = _current.With().Data(defaultValue); - } + // Note: The subscription has been created using our own Context, and we forward the subscriber context only for requests propagation. + return _subscription!.GetMessages(context, ct); } - /// - /// Legacy - Used only be legacy IInput syntax - /// - [EditorBrowsable(EditorBrowsableState.Never)] - public StateImpl(Option defaultValue) - : this(null!, defaultValue) + /// + public async ValueTask UpdateMessage(Action> updater, CancellationToken ct) { - } + // First we make sure that the UpdateFeed is active, so the update will be applied ^^ + Enable(); - internal IAsyncEnumerable> GetSource(CancellationToken ct) - => GetSource(Context, ct); + var update = new Update(updater, _updatesKind); + _inner.Add(update); + await update.HasBeenApplied; // Makes sure to forward (the first) error to the caller if any. + } - public async IAsyncEnumerable> GetSource(SourceContext context, [EnumeratorCancellation] CancellationToken ct = default) + private void Enable() { - using var _ = _innerEnumeration?.Enable(); - - if (Context is not null /*Legacy IInput support*/ && context != Context) - { - _requests.Add(context.RequestSource, ct); - } - - var isFirstMessage = true; - TaskCompletionSource? next; - (bool hasMessage, Message message) initial; - lock(_updateGate) - { - // We access to the _current only in the _updateGate, so we make sure that we would never miss or replay a value - // by listening to the _next too late/early - initial = (_hasCurrent, _current); - - next = _next; - } - - if (initial.hasMessage) - { - yield return Message.Initial.OverrideBy(initial.message); - isFirstMessage = false; - } - - while (!ct.IsCancellationRequested && next is not null) + if (_subscription is not null) { - Message current; - try - { - (current, next) = await next.Task.ConfigureAwait(false); - } - catch (TaskCanceledException) - { - yield break; - } - - if (isFirstMessage) - { - current = Message.Initial.OverrideBy(current); - isFirstMessage = false; - } - - yield return current; + return; } - if (isFirstMessage) + // Note: The subscription has to be created using our own Context, not the one of our subscribers. + var subscription = Context.States.GetOrCreateSubscription, T>(_inner); + if (Interlocked.CompareExchange(ref _subscription, subscription, null) is null) { - yield return Message.Initial; + _subscriptionMode = _subscription.UpdateMode(_mode); } } /// - public ValueTask UpdateMessage(Action> updater, CancellationToken ct) + public async ValueTask DisposeAsync() { - if (_updates is not null) - { - // WARNING: There is a MAJOR issue here: if updater fails, the error will be propagated into the output feed instead of the caller! - // This is acceptable for now as so far there is no way to reach this code from public API - - // First we make sure that the UpdateFeed is active, so the update will be applied ^^ - _innerEnumeration?.Enable(); - return _updates.Update((_, _) => true, (_, msg) => updater(new(msg.Get, ((IMessageBuilder)msg).Set)), ct); - } - else + // Note: As the _innerFeed as been created by us, we dispose the _subscription (even it belongs to the StateStore) + // in order to make sure to release the original underlying feed. + // This is a temporary patch until we support dynamic updates of the subscription mode in FeedSubscription (i.e. the _subscriptionMode). + if (_subscription is { } sub) { - // Compatibility mode - return UpdateCore(msg => msg.With().Apply(updater).Build(), ct); + await sub.DisposeAsync(); } + _subscriptionMode?.Dispose(); } - private async ValueTask UpdateCore(Func, Message> updater, CancellationToken ct) + private record Update(Action> Method, StateUpdateKind Kind) : IFeedUpdate { - if (_next is null) - { - return; - } - - Message updated; - TaskCompletionSource? current, next; - lock (_updateGate) - { - updated = updater(_current); + private readonly TaskCompletionSource _firstResult = new(); - if (_current.Current != updated.Previous) - { - throw new InvalidOperationException( - "The updated message is not based on the current message. " - + "You must use the Message.With() or Message.OverrideBy() to create a new version."); - } - - if (_hasCurrent && updated is { Changes.Count: 0 }) - { - return; - } - - if (!MoveNext(out current, out next, ct)) - { - return; - } - - _current = updated; - _hasCurrent = true; - } + public Task HasBeenApplied => _firstResult.Task; - current.TrySetResult(new Node(updated, next)); - } + /// + public bool IsActive(bool parentChanged, MessageBuilder message) + => !_firstResult.Task.IsFaulted + && (Kind is StateUpdateKind.Persistent || !parentChanged || !_firstResult.Task.IsCompleted); - private bool MoveNext( - [NotNullWhen(true)] out TaskCompletionSource? current, - [NotNullWhen(true)] out TaskCompletionSource? next, - CancellationToken ct) - { - next = new TaskCompletionSource(); - while (true) + /// + public void Apply(bool parentChanged, MessageBuilder message) { - current = _next; - if (ct.IsCancellationRequested || current is null) + try { - // Update has been aborted or State has been disposed - return false; + Method(new(message.Get, ((IMessageBuilder)message).Set)); + _firstResult.TrySetResult(null); } - - if (Interlocked.CompareExchange(ref _next, next, current) == current) + catch (Exception error) when (!_firstResult.Task.IsCompleted) // Otherwise let the exception bubble up. { - return true; + _firstResult.TrySetException(error); } } } - - /// - public async ValueTask DisposeAsync() - { - if (Context is not null) - { - await Context.DisposeAsync(); - } - _requests.Dispose(); // Safety only, should have already been disposed by the Context - _innerEnumeration?.Dispose(); - Interlocked.Exchange(ref _next, null)?.TrySetCanceled(); - } } diff --git a/src/Uno.Extensions.Reactive/Core/Internal/StateSubscriptionMode.cs b/src/Uno.Extensions.Reactive/Core/Internal/StateSubscriptionMode.cs deleted file mode 100644 index c22fac11ba..0000000000 --- a/src/Uno.Extensions.Reactive/Core/Internal/StateSubscriptionMode.cs +++ /dev/null @@ -1,28 +0,0 @@ -using System; -using System.Linq; - -namespace Uno.Extensions.Reactive; - -[Flags] -internal enum StateSubscriptionMode -{ - /// - /// Underlying feed is enumerated only once the state is being enumerated. - /// - Lazy = 0, - - /// - /// Underlying feed is enumerated at the creation of the state. - /// - Eager = 1, - - /// - /// The enumeration of the underlying feed is stopped as soon as this state is no longer enumerated. - /// - RefCounted = 2, - - /// - /// The default is Lazy, not ref counted. - /// - Default = Lazy, -} diff --git a/src/Uno.Extensions.Reactive/Core/Internal/SubscriptionMode.cs b/src/Uno.Extensions.Reactive/Core/Internal/SubscriptionMode.cs new file mode 100644 index 0000000000..5dfa5aebef --- /dev/null +++ b/src/Uno.Extensions.Reactive/Core/Internal/SubscriptionMode.cs @@ -0,0 +1,31 @@ +using System; +using System.Linq; + +namespace Uno.Extensions.Reactive; + +/// +/// For a wrapper (e.g. a state) of a publisher (e.g. a feed), indicates how the underlying publisher should be subscribed. +/// +[Flags] +internal enum SubscriptionMode +{ + /// + /// Underlying publisher is enumerated only once the wrapper is being enumerated. + /// + Lazy = 0, + + /// + /// Underlying publisher is enumerated at the creation of the wrapper. + /// + Eager = 1, + + /// + /// The enumeration of the underlying publisher is stopped as soon as the wrapper is no longer enumerated. + /// + RefCounted = 2, + + /// + /// The default is Lazy, not ref counted. + /// + Default = Lazy, +} diff --git a/src/Uno.Extensions.Reactive/Utils/AsyncEnumerables/ReplayOneAsyncEnumerable.cs b/src/Uno.Extensions.Reactive/Utils/AsyncEnumerables/ReplayOneAsyncEnumerable.cs index 44e5fc3c72..5d346c42d7 100644 --- a/src/Uno.Extensions.Reactive/Utils/AsyncEnumerables/ReplayOneAsyncEnumerable.cs +++ b/src/Uno.Extensions.Reactive/Utils/AsyncEnumerables/ReplayOneAsyncEnumerable.cs @@ -25,12 +25,12 @@ private static class State public const int Disposed = int.MaxValue; } - public ReplayOneAsyncEnumerable(IAsyncEnumerable inner, StateSubscriptionMode mode = StateSubscriptionMode.Lazy, bool isInitialSyncValuesSkippingAllowed = true) + public ReplayOneAsyncEnumerable(IAsyncEnumerable inner, SubscriptionMode mode = SubscriptionMode.Lazy, bool isInitialSyncValuesSkippingAllowed = true) { _inner = inner; _isInitialSyncValuesSkippingAllowed = isInitialSyncValuesSkippingAllowed; - if (mode.HasFlag(StateSubscriptionMode.Eager)) + if (mode.HasFlag(SubscriptionMode.Eager)) { Enable(); } @@ -44,6 +44,13 @@ public void Enable() // a.k.a. Connect() } } + /// + /// Gets the last value enumerated from the source. + /// This IS NOT the current value of an enumerator from . + /// + public bool TryGetCurrent([NotNullWhen(true)] out T value) + => _current.TryGetValue(out value); + public void Disable() { // Not supported yet @@ -95,7 +102,6 @@ public async IAsyncEnumerator GetAsyncEnumerator(CancellationToken ct) var needsToEnableEnumeration = true; if (_isInitialSyncValuesSkippingAllowed) { - Enable(); needsToEnableEnumeration = false; } From 178cadba4703f86b65b5ff53e58dc27e2ea1dadd Mon Sep 17 00:00:00 2001 From: David Date: Tue, 20 Jun 2023 11:42:45 -0400 Subject: [PATCH 4/7] chore: Update the ListFeedSelection to use the new State structure --- .../Operators/ListFeedSelection.cs | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/Uno.Extensions.Reactive/Operators/ListFeedSelection.cs b/src/Uno.Extensions.Reactive/Operators/ListFeedSelection.cs index 3b5595abf8..c90ccb8d9f 100644 --- a/src/Uno.Extensions.Reactive/Operators/ListFeedSelection.cs +++ b/src/Uno.Extensions.Reactive/Operators/ListFeedSelection.cs @@ -20,7 +20,7 @@ internal sealed class ListFeedSelection : IListState, private readonly Func, Option, SelectionInfo> _otherToSelection; private readonly CancellationTokenSource _ct; private readonly string _name; - private readonly StateSubscriptionMode _mode; + private readonly SubscriptionMode _mode; private int _state = State.New; private IState>? _impl; @@ -38,7 +38,7 @@ public ListFeedSelection( Func, SelectionInfo, Option, Option> selectionToOther, Func, Option, SelectionInfo> otherToSelection, string logTag, - StateSubscriptionMode mode = StateSubscriptionMode.Default) + SubscriptionMode mode = SubscriptionMode.Default) { _source = source; _selectionState = selectionState; @@ -52,7 +52,7 @@ public ListFeedSelection( _ct = CancellationTokenSource.CreateLinkedTokenSource(ctx.Token); // We however allow early dispose of this state Context = ctx; - if (mode is StateSubscriptionMode.Eager) + if (mode is SubscriptionMode.Eager) { Enable(); } @@ -108,10 +108,9 @@ private IState> Enable() case State.Enabled: return _impl!; } - var feed = new UpdateFeed>(_source.AsFeed()); - var impl = new StateImpl>(Context, feed, StateSubscriptionMode.Eager); + var impl = new StateImpl>(Context, _source.AsFeed(), SubscriptionMode.Eager); - SelectionFeedUpdate? selectionFromState = null; + SelectionFeedUpdate? currentSelectionFromState = null; Context .GetOrCreateSource(_selectionState) @@ -129,18 +128,18 @@ async ValueTask SyncFromStateToList(Message otherMsg, CancellationToken { // Note: We sync only the SelectedItem. Error, Transient and any other axis are **not** expected to flow between the 2 sources. - var stateSelection = new SelectionFeedUpdate(this, otherMsg.Current.Data); + var updatedSelectionFromState = new SelectionFeedUpdate(this, otherMsg.Current.Data); - if (selectionFromState is not null) + if (currentSelectionFromState is not null) { - feed.Replace(selectionFromState, stateSelection); + impl.Inner.Replace(currentSelectionFromState, updatedSelectionFromState); } else { - feed.Add(stateSelection); + impl.Inner.Add(updatedSelectionFromState); } - selectionFromState = stateSelection; + currentSelectionFromState = updatedSelectionFromState; } async ValueTask SyncFromListToState(Message> implMsg, CancellationToken ct) From 98e6bab167dbfb60800cdcb5dae98358b180026f Mon Sep 17 00:00:00 2001 From: David Date: Tue, 20 Jun 2023 11:43:29 -0400 Subject: [PATCH 5/7] chore: Add ability for the UpdateFeed to wait for parent value before applying updates --- .../Operators/UpdateFeed.cs | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/src/Uno.Extensions.Reactive/Operators/UpdateFeed.cs b/src/Uno.Extensions.Reactive/Operators/UpdateFeed.cs index 8afb611693..2998e46c8e 100644 --- a/src/Uno.Extensions.Reactive/Operators/UpdateFeed.cs +++ b/src/Uno.Extensions.Reactive/Operators/UpdateFeed.cs @@ -20,16 +20,18 @@ internal record FeedUpdate(Func, bool> IsActive, A { bool IFeedUpdate.IsActive(bool parentChanged, MessageBuilder message) => IsActive(parentChanged, message); void IFeedUpdate.Apply(bool parentChanged, MessageBuilder message) => Apply(parentChanged, message); -}; +} internal sealed class UpdateFeed : IFeed { private readonly AsyncEnumerableSubject<(IFeedUpdate[]? added, IFeedUpdate[]? removed)> _updates = new(ReplayMode.Disabled); private readonly IFeed _source; + private readonly Predicate>? _waitForParent; - public UpdateFeed(IFeed source) + public UpdateFeed(IFeed source, Predicate>? waitForParent = null) { _source = source; + _waitForParent = waitForParent; } public async ValueTask Update(Func, bool> predicate, Action> updater, CancellationToken ct) @@ -53,19 +55,23 @@ public IAsyncEnumerable> GetSource(SourceContext context, Cancellatio private class UpdateFeedSource : IAsyncEnumerable> { + private readonly UpdateFeed _owner; private readonly CancellationToken _ct; private readonly AsyncEnumerableSubject> _subject; private readonly MessageManager _message; private ImmutableList> _activeUpdates; private bool _isInError; + private bool _isParentReady; public UpdateFeedSource(UpdateFeed owner, SourceContext context, CancellationToken ct) { + _owner = owner; _ct = ct; _subject = new AsyncEnumerableSubject>(ReplayMode.EnabledForFirstEnumeratorOnly); _message = new MessageManager(_subject.SetNext); _activeUpdates = ImmutableList>.Empty; + _isParentReady = owner._waitForParent is null; // mode=AbortPrevious => When we receive a new update, we can abort the update and start a new one owner._updates.ForEachAsync(OnUpdateReceived, ct); @@ -80,6 +86,11 @@ private void OnUpdateReceived((IFeedUpdate[]? added, IFeedUpdate[]? remove { lock (this) { + if (!_isParentReady) + { + return; + } + var canDoIncrementalUpdate = !_isInError; if (args.removed is { Length: > 0 } removed) { @@ -106,6 +117,11 @@ private void OnParentUpdated(Message parentMsg) { lock (this) { + if (!_isParentReady) + { + _isParentReady |= _owner._waitForParent!.Invoke(parentMsg); + } + RebuildMessage(parentMsg); } } @@ -127,7 +143,8 @@ private void IncrementalUpdate(IFeedUpdate[] updates) catch (Exception error) { _isInError = true; - return current.WithParentOnly(null) + return current + .WithParentOnly(null) .Data(Option.Undefined()) .Error(error); } From 016458649bbab59514752d30fb059827941867b4 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 20 Jun 2023 13:36:43 -0400 Subject: [PATCH 6/7] test: Adjust tests to comply with the new unifiy implementation of State --- .../FeedTestContext.cs | 6 --- .../Core/Given_FeedSubscription.cs | 8 +++- .../Core/Given_MessageBuilder.cs | 46 +++++++++++++++++-- .../Core/Given_StateImpl.cs | 5 +- .../Given_ListFeed.cs | 4 ++ .../Operators/Given_CombineFeed.cs | 4 +- .../Operators/Given_FeedToListFeedAdapter.cs | 6 +++ .../Operators/Given_ListFeedSelection.cs | 12 ++--- .../Core/Internal/FeedSubscription.cs | 14 +----- .../AsyncEnumerableSubject.cs | 4 +- 10 files changed, 75 insertions(+), 34 deletions(-) diff --git a/src/Uno.Extensions.Reactive.Testing/FeedTestContext.cs b/src/Uno.Extensions.Reactive.Testing/FeedTestContext.cs index fb65185187..7437e92121 100644 --- a/src/Uno.Extensions.Reactive.Testing/FeedTestContext.cs +++ b/src/Uno.Extensions.Reactive.Testing/FeedTestContext.cs @@ -34,9 +34,6 @@ public FeedTestContext(TestContext testContext) _subscription = SourceContext.AsCurrent(); testContext?.CancellationTokenSource.Token.Register(Dispose); - - // For tests we prefer to replay all vales - FeedSubscription.IsInitialSyncValuesSkippingAllowed = false; } /// @@ -49,9 +46,6 @@ public FeedTestContext([CallerMemberName] string? name = null) _name = name ?? throw new ArgumentNullException("Context must be named."); SourceContext = SourceContext.GetOrCreate(this); _subscription = SourceContext.AsCurrent(); - - // For tests we prefer to replay all vales - FeedSubscription.IsInitialSyncValuesSkippingAllowed = false; } /// diff --git a/src/Uno.Extensions.Reactive.Tests/Core/Given_FeedSubscription.cs b/src/Uno.Extensions.Reactive.Tests/Core/Given_FeedSubscription.cs index 3f946f9902..4db0014015 100644 --- a/src/Uno.Extensions.Reactive.Tests/Core/Given_FeedSubscription.cs +++ b/src/Uno.Extensions.Reactive.Tests/Core/Given_FeedSubscription.cs @@ -34,6 +34,9 @@ public async Task When_SubscribeTwice_Then_Replay() async IAsyncEnumerable Source([EnumeratorCancellation] CancellationToken ct = default) { yield return 1; + + await Task.Yield(); // Make sure to run async, so listener will receive 1 message. + yield return 2; yield return 3; } @@ -41,9 +44,12 @@ async IAsyncEnumerable Source([EnumeratorCancellation] CancellationToken ct var sut = new FeedSubscription(src, Context.SourceContext); var sub1Message = await sut.GetMessages(Context.SourceContext, CT).FirstAsync(CT); + + await Task.Delay(10); // Make sure to run async, so listener will receive next messages. + var sub2Message = await sut.GetMessages(Context.SourceContext, CT).FirstAsync(CT); - sub1Message.Current.Data.SomeOrDefault().Should().Be(1, "We do not allow to ignore first values in test (cf. FeedSubscription.IsInitialSyncValuesSkippingAllowed)"); + sub1Message.Current.Data.SomeOrDefault().Should().Be(1, "We added a delay before the second value"); sub2Message.Current.Data.SomeOrDefault().Should().Be(3, "the subscription should have stay active"); } diff --git a/src/Uno.Extensions.Reactive.Tests/Core/Given_MessageBuilder.cs b/src/Uno.Extensions.Reactive.Tests/Core/Given_MessageBuilder.cs index 4008e8180f..e340d7cd8f 100644 --- a/src/Uno.Extensions.Reactive.Tests/Core/Given_MessageBuilder.cs +++ b/src/Uno.Extensions.Reactive.Tests/Core/Given_MessageBuilder.cs @@ -26,18 +26,37 @@ public void When_TransientAxis_Then_Dismissed() } [TestMethod] - public async Task When_TransientAxis_Then_Dismissed_OfT() + public async Task When_TransientAxisNoUpdate_Then_NotDismissed_OfT() { var myAxis = new MessageAxis("my test axis", _ => new object()) { IsTransient = true }; var manager = new MessageManager(); manager.Update(current => current.With().Set(myAxis, new object()), CT); var original = manager.Current; - // Note: Even if we don't set any axis in Update, the Current message should be updated only by the fact that the transient axis has been automatically removed + // Note: We need to change at least one value for the Current message to be updated (and the transient axis has been automatically removed) MessageBuilder? builder = default; manager.Update(current => builder = current.With(), CT); var updated = manager.Current; + original.Current[myAxis].IsSet.Should().BeTrue(); + builder!.Get(myAxis).value.IsSet.Should().BeFalse("transient axis should have been cleared at beginning to reflect resulting state"); + updated.Current[myAxis].IsSet.Should().BeTrue("transient axis should have not have been cleared has no valid update was performed"); + } + + [TestMethod] + public async Task When_TransientAxis_Then_Dismissed_OfT() + { + var myAxis = new MessageAxis("my test axis", _ => new object()) { IsTransient = true }; + var myAxis2 = new MessageAxis("my test axis 2", _ => new object()); + var manager = new MessageManager(); + manager.Update(current => current.With().Set(myAxis, new object()), CT); + var original = manager.Current; + + // Note: We need to change at least one value for the Current message to be updated (and the transient axis has been automatically removed) + MessageBuilder? builder = default; + manager.Update(current => builder = current.With().Set(myAxis2, new object()), CT); + var updated = manager.Current; + original.Current[myAxis].IsSet.Should().BeTrue(); builder!.Get(myAxis).value.IsSet.Should().BeFalse("transient axis should have been cleared at beginning to reflect resulting state"); updated.Current[myAxis].IsSet.Should().BeFalse("transient axis should have been cleared on update"); @@ -82,7 +101,7 @@ public async Task When_TransientAxisByParentAndLocally_Then_NotDismissed_OfT() } [TestMethod] - public async Task When_TransientAxis_Then_Dismissed_TransactionOfT() + public async Task When_TransientAxisNoUpdate_Then_NotDismissed_TransactionOfT() { var myAxis = new MessageAxis("my test axis", _ => new object()) { IsTransient = true }; var manager = new MessageManager(); @@ -96,6 +115,27 @@ public async Task When_TransientAxis_Then_Dismissed_TransactionOfT() transaction.Update(current => builder = current.With()); var updated = manager.Current; + original.Current[myAxis].IsSet.Should().BeTrue(); + builder.Get(myAxis).value.IsSet.Should().BeFalse("transient axis should have been cleared at beginning to reflect resulting state"); + updated.Current[myAxis].IsSet.Should().BeTrue("transient axis should have not have been cleared has no valid update was performed"); + } + + [TestMethod] + public async Task When_TransientAxis_Then_Dismissed_TransactionOfT() + { + var myAxis = new MessageAxis("my test axis", _ => new object()) { IsTransient = true }; + var myAxis2 = new MessageAxis("my test axis 2", _ => new object()); + var manager = new MessageManager(); + manager.Update(current => current.With().Set(myAxis, new object()), CT); + var original = manager.Current; + + // Note: Even if we don't set any axis in Update, the Current message should be updated only by the fact that the transient axis has been automatically removed + // Note 2: No needs to Commit the transaction, manager.Current should already reflect changes + MessageManager.UpdateTransaction.MessageBuilder builder = default; + using var transaction = manager.BeginUpdate(CT); + transaction.Update(current => builder = current.With().Set(myAxis2, new object())); + var updated = manager.Current; + original.Current[myAxis].IsSet.Should().BeTrue(); builder.Get(myAxis).value.IsSet.Should().BeFalse("transient axis should have been cleared at beginning to reflect resulting state"); updated.Current[myAxis].IsSet.Should().BeFalse("transient axis should have been cleared on update"); diff --git a/src/Uno.Extensions.Reactive.Tests/Core/Given_StateImpl.cs b/src/Uno.Extensions.Reactive.Tests/Core/Given_StateImpl.cs index a497ce15c6..48e6cb6889 100644 --- a/src/Uno.Extensions.Reactive.Tests/Core/Given_StateImpl.cs +++ b/src/Uno.Extensions.Reactive.Tests/Core/Given_StateImpl.cs @@ -19,7 +19,10 @@ public async Task When_Create_Then_TaskDoNotLeak() { var sut = new StateImpl(Context, Option.None()); - var next = sut.GetType().GetField("_next", BindingFlags.Instance | BindingFlags.NonPublic)!.GetValue(sut)!; + var sub = sut.GetType().GetField("_subscription", BindingFlags.Instance | BindingFlags.NonPublic)!.GetValue(sut)!; + var src = sub.GetType().GetField("_source", BindingFlags.Instance | BindingFlags.NonPublic)!.GetValue(sub)!; + var node = src.GetType().GetField("_current", BindingFlags.Instance | BindingFlags.NonPublic)!.GetValue(src)!; + var next = node.GetType().GetField("_next", BindingFlags.Instance | BindingFlags.NonPublic)!.GetValue(node)!; var task = (Task)next.GetType().GetProperty("Task")!.GetValue(next)!; task.CreationOptions diff --git a/src/Uno.Extensions.Reactive.Tests/Given_ListFeed.cs b/src/Uno.Extensions.Reactive.Tests/Given_ListFeed.cs index faded33b0b..2d2a906c39 100644 --- a/src/Uno.Extensions.Reactive.Tests/Given_ListFeed.cs +++ b/src/Uno.Extensions.Reactive.Tests/Given_ListFeed.cs @@ -43,6 +43,8 @@ public async Task When_AsyncEnumerable() { async IAsyncEnumerable> GetSource() { + await Task.Yield(); // Make sure to run async, so listener will receive all messages. + yield return new[] { 40, 41, 42 }.ToImmutableList(); yield return new[] { 41, 42, 43 }.ToImmutableList(); yield return new[] { 42, 43, 44 }.ToImmutableList(); @@ -64,6 +66,8 @@ public async Task When_Create() { async IAsyncEnumerable>> GetSource([EnumeratorCancellation] CancellationToken ct) { + await Task.Yield(); // Make sure to run async, so listener will receive all messages. + var msg = Message>.Initial; yield return msg = msg.With().Data(new[] { 40, 41, 42 }.ToImmutableList()); diff --git a/src/Uno.Extensions.Reactive.Tests/Operators/Given_CombineFeed.cs b/src/Uno.Extensions.Reactive.Tests/Operators/Given_CombineFeed.cs index 7ab7004be8..99692c3232 100644 --- a/src/Uno.Extensions.Reactive.Tests/Operators/Given_CombineFeed.cs +++ b/src/Uno.Extensions.Reactive.Tests/Operators/Given_CombineFeed.cs @@ -14,8 +14,8 @@ public class Given_CombineFeed : FeedTests [TestMethod] public async Task When_Combine2() { - var feed1 = new StateImpl(Option.Undefined()); - var feed2 = new StateImpl(Option.Undefined()); + var feed1 = new StateImpl(Context, Option.Undefined()); + var feed2 = new StateImpl(Context, Option.Undefined()); var sut = Feed.Combine(feed1, feed2).Record(); diff --git a/src/Uno.Extensions.Reactive.Tests/Operators/Given_FeedToListFeedAdapter.cs b/src/Uno.Extensions.Reactive.Tests/Operators/Given_FeedToListFeedAdapter.cs index 6c0e602e6e..4df4d13773 100644 --- a/src/Uno.Extensions.Reactive.Tests/Operators/Given_FeedToListFeedAdapter.cs +++ b/src/Uno.Extensions.Reactive.Tests/Operators/Given_FeedToListFeedAdapter.cs @@ -49,6 +49,8 @@ public async Task When_KeyEquatableNoComparerAndUpdate_Then_TrackItemsUsingKeyEq async IAsyncEnumerable> GetSource([EnumeratorCancellation] CancellationToken ct = default) { + await Task.Yield(); // Make sure to run async, so listener will receive all messages. + yield return original.ToImmutableList(); yield return updated.ToImmutableList(); } @@ -77,6 +79,8 @@ public async Task When_NotKeyEquatableNoComparerAndUpdate_Then_TrackItemsUsingKe async IAsyncEnumerable> GetSource([EnumeratorCancellation] CancellationToken ct = default) { + await Task.Yield(); // Make sure to run async, so listener will receive all messages. + yield return original.ToImmutableList(); yield return updated.ToImmutableList(); } @@ -105,6 +109,8 @@ public async Task When_ClassNoComparerAndUpdate_Then_TrackItemsUsingKeyEquality( async IAsyncEnumerable> GetSource([EnumeratorCancellation] CancellationToken ct = default) { + await Task.Yield(); // Make sure to run async, so listener will receive all messages. + yield return original.ToImmutableList(); yield return updated.ToImmutableList(); } diff --git a/src/Uno.Extensions.Reactive.Tests/Operators/Given_ListFeedSelection.cs b/src/Uno.Extensions.Reactive.Tests/Operators/Given_ListFeedSelection.cs index 9236d01ca8..58bf1c8c9d 100644 --- a/src/Uno.Extensions.Reactive.Tests/Operators/Given_ListFeedSelection.cs +++ b/src/Uno.Extensions.Reactive.Tests/Operators/Given_ListFeedSelection.cs @@ -203,7 +203,7 @@ await selection.Should().BeAsync(r => r } [TestMethod] - public async Task When_Single_With_InvalidInitial_And_UpdateSutStateInvalid_Then_ListStateUpdated() + public async Task When_Single_With_InvalidInitial_And_UpdateSutStateInvalid_Then_ListStateNotUpdated() { var selection = State.Value(this, () => 15).Record(); var list = ListState.Async(this, async _ => Enumerable.Range(0, 10).ToImmutableList()); @@ -248,7 +248,7 @@ await selection.Should().BeAsync(r => r } [TestMethod] - public async Task When_Single_With_ValidInitial_And_UpdateSutStateInvalid_Then_ListStateUpdated() + public async Task When_Single_With_ValidInitial_And_UpdateSutStateInvalid_Then_ListStateNotUpdated() { var selection = State.Value(this, () => 1).Record(); var list = ListState.Async(this, async _ => Enumerable.Range(0, 10).ToImmutableList()); @@ -458,7 +458,7 @@ await selection.Should().BeAsync(r => r } [TestMethod] - public async Task When_Multiple_With_InvalidInitial_And_UpdateSutStateInvalid_Then_ListStateUpdated() + public async Task When_Multiple_With_InvalidInitial_And_UpdateSutStateInvalid_Then_ListStateNotUpdated() { var selection = State.Value(this, () => ImmutableList.Create(15) as IImmutableList).Record(); var list = ListState.Async(this, async _ => Enumerable.Range(0, 10).ToImmutableList()); @@ -503,7 +503,7 @@ await selection.Should().BeAsync(r => r } [TestMethod] - public async Task When_Multiple_With_ValidInitial_And_UpdateSutStateInvalid_Then_ListStateUpdated() + public async Task When_Multiple_With_ValidInitial_And_UpdateSutStateInvalid_Then_ListStateNotUpdated() { var selection = State.Value(this, () => ImmutableList.Create(1) as IImmutableList).Record(); var list = ListState.Async(this, async _ => Enumerable.Range(0, 10).ToImmutableList()); @@ -722,7 +722,7 @@ await selection.Should().BeAsync(r => r } [TestMethod] - public async Task When_ProjectedSingle_With_InvalidInitial_And_UpdateSutStateInvalid_Then_ListStateUpdated() + public async Task When_ProjectedSingle_With_InvalidInitial_And_UpdateSutStateInvalid_Then_ListStateNotUpdated() { var selection = State.Value(this, () => new MyAggregateRoot(15)).Record(); var items = Enumerable.Range(0, 10).Select(i => new MyEntity(i)).ToImmutableList(); @@ -769,7 +769,7 @@ await selection.Should().BeAsync(r => r } [TestMethod] - public async Task When_ProjectedSingle_With_ValidInitial_And_UpdateSutStateInvalid_Then_ListStateUpdated() + public async Task When_ProjectedSingle_With_ValidInitial_And_UpdateSutStateInvalid_Then_ListStateNotUpdated() { var selection = State.Value(this, () => new MyAggregateRoot(1)).Record(); var items = Enumerable.Range(0, 10).Select(i => new MyEntity(i)).ToImmutableList(); diff --git a/src/Uno.Extensions.Reactive/Core/Internal/FeedSubscription.cs b/src/Uno.Extensions.Reactive/Core/Internal/FeedSubscription.cs index 7de6330517..e026121eab 100644 --- a/src/Uno.Extensions.Reactive/Core/Internal/FeedSubscription.cs +++ b/src/Uno.Extensions.Reactive/Core/Internal/FeedSubscription.cs @@ -12,18 +12,6 @@ namespace Uno.Extensions.Reactive.Core; -internal class FeedSubscription -{ - /// - /// Determines if we allow FeedSubscription to bypass multiple initial sync values (cf. remarks for more details). - /// - /// - /// This is almost only a test case, but if a source feeds enumerates multiple values at startup (i.e. in the GetSource), - /// the which backs the might miss some of those values to replay only the last one. - /// - public static bool IsInitialSyncValuesSkippingAllowed { get; set; } = true; -} - internal class FeedSubscription : IAsyncDisposable, ISourceContextOwner { private readonly ISignal> _feed; @@ -39,7 +27,7 @@ public FeedSubscription(ISignal> feed, SourceContext rootContext) _context = rootContext.CreateChild(this, _requests); _source = new ReplayOneAsyncEnumerable>( feed.GetSource(_context), - isInitialSyncValuesSkippingAllowed: FeedSubscription.IsInitialSyncValuesSkippingAllowed); + isInitialSyncValuesSkippingAllowed: true); } string ISourceContextOwner.Name => $"Sub on '{_feed}' for ctx '{_context.Parent!.Owner.Name}'."; diff --git a/src/Uno.Extensions.Reactive/Utils/AsyncEnumerables/AsyncEnumerableSubject.cs b/src/Uno.Extensions.Reactive/Utils/AsyncEnumerables/AsyncEnumerableSubject.cs index f4888d5f44..fb125a0677 100644 --- a/src/Uno.Extensions.Reactive/Utils/AsyncEnumerables/AsyncEnumerableSubject.cs +++ b/src/Uno.Extensions.Reactive/Utils/AsyncEnumerables/AsyncEnumerableSubject.cs @@ -41,7 +41,7 @@ public void Complete() => MoveNext(false, default, error: null, mightHaveNext: false); public void Fail(Exception error) - => MoveNext(false, default, error: null, mightHaveNext: false); + => MoveNext(false, default, error: error, mightHaveNext: false); public void TrySetNext(T item) => MoveNext(true, item, error: null, mightHaveNext: true, throwOnError: false); @@ -53,7 +53,7 @@ public void TryComplete() => MoveNext(false, default, error: null, mightHaveNext: false, throwOnError: false); public void TryFail(Exception error) - => MoveNext(false, default, error: null, mightHaveNext: false, throwOnError: false); + => MoveNext(false, default, error: error, mightHaveNext: false, throwOnError: false); private void MoveNext(bool hasValue, T? value, Exception? error = null, bool mightHaveNext = true, bool throwOnError = true) From d796e1bcffe5e764c6eb94d3440b3f3b2d1dff49 Mon Sep 17 00:00:00 2001 From: David Date: Mon, 17 Jul 2023 09:29:52 -0400 Subject: [PATCH 7/7] chore: Fix build by disabling implicit usings --- .../Uno.Extensions.Reactive.Generator.csproj | 1 + src/Uno.Extensions.Reactive.UI/common.props | 1 + src/Uno.Extensions.Reactive/Core/Internal/StateImpl.cs | 3 +++ src/Uno.Extensions.Reactive/Uno.Extensions.Reactive.csproj | 1 + 4 files changed, 6 insertions(+) diff --git a/src/Uno.Extensions.Reactive.Generator/Uno.Extensions.Reactive.Generator.csproj b/src/Uno.Extensions.Reactive.Generator/Uno.Extensions.Reactive.Generator.csproj index 4740942dcc..b4948e9c07 100644 --- a/src/Uno.Extensions.Reactive.Generator/Uno.Extensions.Reactive.Generator.csproj +++ b/src/Uno.Extensions.Reactive.Generator/Uno.Extensions.Reactive.Generator.csproj @@ -3,6 +3,7 @@ netstandard2.0 false + false True + false