Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AK3000: Custom Akka.Streams OutGraphStageLogis must call SetHandler for OUT port #93

Open
Aaronontheweb opened this issue Apr 16, 2024 · 0 comments
Labels
AK2000 API usage rules

Comments

@Aaronontheweb
Copy link
Member

Is your feature request related to a problem? Please describe.

So this is a tad tricky because it depends very much on the type of Stage<T> base class used, but for this SourceShape<T> for instance:

// -----------------------------------------------------------------------
// <copyright file="PacketIdSource.cs" company="Petabridge, LLC">
//      Copyright (C) 2024 - 2024 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------

using Akka;
using Akka.Streams;
using Akka.Streams.Stage;

namespace TurboMqtt.Core.Streams;

// create a custom Source<ZonZeroUInt32, NotUsed> that generates unique packet IDs
// for each message that needs to be sent out over the wire
internal sealed class PacketIdSource : GraphStage<SourceShape<ushort>>
{
    public PacketIdSource()
    {
        Shape = new SourceShape<ushort>(Out);
    }

    public Outlet<ushort> Out { get; } = new("PacketIdSource.Out");

    public override SourceShape<ushort> Shape { get; }

    private sealed class Logic(PacketIdSource source) : OutGraphStageLogic(source.Shape) 
    {
        private ushort _currentId = 0;

        public override void OnPull()
        {
            // ensure that we wrap around
            if (_currentId == ushort.MaxValue)
            {
                _currentId = 0;
            }

            // guarantees that we never hit zero
            Push(source.Out, ++_currentId);
        }
    }

    protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
    {
        return new Logic(this);
    }
}

The Logic in this case doesn't call SetHandler anywhere, so this stream stage will blow up as soon as it runs for the first time.

Describe the solution you'd like

Depending on the type of Logic used, we should ensure that SetHandler is called somewhere in either the CTOR or one of the other methods invoked inside this actor.

  • For Sinks and Sources, we only need to call SetHandler once
  • For Flow stages we need to ensure it's called twice
  • For Fan-in and Fan-out stages this is more complicated and probably can't be enforced via rule other than ensuring that SetHandler is called AT LEAST ONCE somewhere
@Aaronontheweb Aaronontheweb added the AK2000 API usage rules label Apr 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AK2000 API usage rules
Projects
None yet
Development

No branches or pull requests

1 participant