Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PersistentStreamPullingAgent skips over the message under a certain condition #9023

Open
tchelidze opened this issue May 24, 2024 · 22 comments
Assignees

Comments

@tchelidze
Copy link

tchelidze commented May 24, 2024

Problem:

When publishing a message under the following conditions

  • using MemoryStreams
  • Stream is inactive, meaning it's not in the PersistentStreamPullingAgent.pubSubCache
  • Messages cache is empty, meaning PersistentStreamPullingAgent.queueCache has no messages in it.

Then the published message is lost.

How to reproduce:

here is the link to the GitHub repo demonstrating the issue https://github.com/tchelidze/Orleans_MemoryStream_LostMessage/tree/master/Orleans_MemoryStream_LostMessage

Analysis:

Consider the following scenario : We published message number 1 to the stream. Then we wait and in the meantime stream goes inactive and message cache gets purged. After that we publish message number 2 and 3 to the stream. Then the following happens.

Inside PersistentStreamPullingAgent.DoHandshakeWithConsumer (which gets called from RegisterAsStreamProducer, remember, stream is inactive) method we retrieve the last processed message token from the consumer. in our case that would be message number 1. Then we take that token (pointing to message number 1) and call queueCache.GetCacheCursor passing that token.

What GetCacheCursor does is where the problem lies. Specifically PooledQueueCache.SetCursor where it checks if the oldestMessage is passed the given token. oldestMessage in our case would be message number 2, while token is message number 1. So that if statement is executed on line 201. Then the interesting part comes, we check if the lastPurgedToken is same or passed the given token. lastPurgedToken again point to message number 1, because that was the last message that was evicted from the stream. So that if statement also executes and PooledQueueCache.SetCursor sets the SequenceToken to the oldest message, which is message number 2.

Issue number 1:
As i understand, lastPurgedToken points to the message which was evicted and no longer in the cache, so checking for sequenceToken.CompareTo(entry.Token) >= 0 does not seem correct here, instead i think it should be sequenceToken.CompareTo(entry.Token) > 0.

Story continues.

So, back to PersistentStreamPullingAgent.DoHandshakeWithConsumer line number 315. Here the expectation is that queueCache.GetCacheCursor gives us back the cursor that points to a last processed message, but because queueCache no longer has the message (message number 1) it returns cursor pointing to the oldest message, which in our case would be message number 2. On line 315 we move the cursor forward (because remember, expectation was that Cursor was pointing to the last processed message). As a result, now cursor points to message number 3, and that's how message number 2 is lost.

Issue number 2:

I think in PersistentStreamPullingAgent.DoHandshakeWithConsumer, instead of blindly moving the cursor to next, we should check if it points to the same position as requestedHandshakeToken and if it does not, then we should not move it forward.

Workaround

Only workaround i can think of is to set the StreamPullingAgentOptions.StreamInactivityPeriod and StreamCacheEvictionOptions.DataMaxAgeInCache to very high values to avoid the scenario where queueCache is empty and stream is inactive.

Thoughts ?

@benjaminpetit benjaminpetit self-assigned this May 30, 2024
@tchelidze
Copy link
Author

any updates ?

@davidvanchu
Copy link

@benjaminpetit It seems like our fix for preventing duplicate event delivery (https://github.com/dotnet/orleans/pull/7699/files from #7686) is inadvertently causing messages to be skipped. We recently upgraded to Orleans 8 from 3.x, and are seeing this issue as well, although we are using the Event Hub stream provider, not MemoryStreams.

@davidvanchu
Copy link

davidvanchu commented Aug 29, 2024

I've played around with the tests in Orleans and I've been able to reproduce the issue here:
main...davidvanchu:orleans:v810_fix_stream_skip

The MemoryStreamResumeTests will show a skipped message after waiting for the stream to become inactive and/or after deactivating the grain.

Removing the MoveNext()s added in https://github.com/dotnet/orleans/pull/7699/files, the tests pass, and if you uncomment the //Assert.Equal(new List<int>() { 1, 2, 3, 4, 5 }, grainInts); and re-run the test, you can see that after removing the MoveNexts, we have some duplicates in the grainInts, array, but that is better than skipping the messages.

Before removing MoveNext()s:
image
After removing MoveNext()s:
image

@fjoly-hilo-energie
Copy link

Is there any update on this? The chance of skipping messages is the reason we're still not on the Streams feature.

@davidvanchu
Copy link

@ReubenBond Can we get an official fix in for this? We are using a forked version of Orleans with this fix in place since we discovered the issue late August. If you would like me to create a PR for this, I am happy to, please confirm that the fix I have in my branch linked above makes sense from your perspective.

@ReubenBond
Copy link
Member

cc @benjaminpetit

@Costo
Copy link
Contributor

Costo commented Nov 12, 2024

Hi,
I think we are seeing the same issue with Event Hubs streams. We have grains representing locations, and every hour a message is published to each location (energy consumption) via an implicit stream.
Yesterday we increased the grain collection age from 15 minutes (default) to 2 hours in order to avoid unnecessary traffic on grain storage account every hour.
Now we are seeing millions of QueueCacheMissExceptions and the majority of the messages are lost (not received by grains).

@oising
Copy link
Contributor

oising commented Nov 13, 2024

Hi, I think we are seeing the same issue with Event Hubs streams. We have grains representing locations, and every hour a message is published to each location (energy consumption) via an implicit stream. Yesterday we increased the grain collection age from 15 minutes (default) to 2 hours in order to avoid unnecessary traffic on grain storage account every hour. Now we are seeing millions of QueueCacheMissExceptions and the majority of the messages are lost (not received by grains).

This is not the same issue. A queue cache miss means that your grains are out-living the lifetime of the cache (roughly speaking.) This isn't intuitive, unfortunately. The way Orleans figures out if you may have missed a message is that it tries to keep track of the last message you processed in the cache. It needs to see the last message when it receives a new message to know that there hasn't been any intermediate messages (that you may have missed.) It tracks the last message for as long as the stream is active for a given grain observer. Now that your grains are living for up to two hours, when the grain receives a new message on the hour, the message from an hour ago has long since been purged from the cache. So Orleans will now throw thie queue cache miss exception - which actually means "I have no idea if I missed a message because the last message I processed is no longer visible [in the cache]." It's just a warning - but a very spammy one at that.

In short, the timespan in MetadataMinTimeInCache must always be larger than the grain collection age for the type of grain that is consuming the stream. So, if your grains are now living for up to two hours, you need to keep metadata for, let's say, 2h15m to be safe. Metadata is used for tracking n-1 message only and doesn't have a significant effect on cache memory usage. The other fields DataMaxAgeInCache and DataMinAgeInCache control the window for rewinding and have a significant effect on cache memory usage.

Before when your grains only lived 15m, the stream cursor would be deallocated and the streaming runtime would "forget" about the last message received before the next one could arrive 45m later.

This kind of problem is unique to the "rewindable cache" model for advanced streaming providers like event hubs. Orleans reads all partitions into silo specific memory caches to allow individual grains to rewind the stream without blocking other grains were it to actually rewind into the partition directly. There is one cache per agent, one agent per partition and possibly multiple agents per silo.

@Costo
Copy link
Contributor

Costo commented Nov 13, 2024

Hi Oisin,

I know that QueueCacheMissExceptions used to be warnings and the messages were delivered anyway.
But the new behavior is that the messages are dropped. Maybe changing the MetadataMinTimeInCache value will fix the problem, but Orleans should not drop messages in the first place.

@oising
Copy link
Contributor

oising commented Nov 13, 2024

Hi Oisin,

I know that QueueCacheMissExceptions used to be warnings and the messages were delivered anyway.
But the new behavior is that the messages are dropped. Maybe changing the MetadataMinTimeInCache value will fix the problem, but Orleans should not drop messages in the first place.

Oh, I missed that part. I don't recall any new behaviour that should drop messages. That's quite odd and definitely sounds like a bug somewhere. If it is this issue then yeah, you may be able to avoid triggering it

@Costo
Copy link
Contributor

Costo commented Jan 8, 2025

We continue to experience this problem. Here's a recap of what we did so far:

Since our streams receive 1 message per hour, we increased grain collection age from 15 minutes to 2 hours in order to avoid unnecessary traffic on grain storage account every hour. This resulted in the loss of most messages.

We deactivated cache metadata purging by setting MetadataMinTimeInCache to null. This seemed to fix the issue, but now the stream was deactivated after 30 minutes and there was a QueueCacheMissException every hour when a new message arrived and the stream was reactivated (DoHandshakeWithConsumer). I think these are fine (no message loss) but anoying.

I then increased stream inactivity period from 30 minutes to 2 hours. Again this resulted in the loss of most messages.

So apparently, when you receive messages infrequently, it helps to disable cache metadata purging, but the stream must be deactivated before the next message or it will likely be dropped.

Honestly this gives me very poor confidence in Orleans Streams at the moment. For new projects we went with an Event Hubs Processor + Broadcast Channels instead of Orleans Streams. We lost some scalability but gained some peace of mind and reliability.

@oising
Copy link
Contributor

oising commented Jan 9, 2025

@benjaminpetit 😳

@andrew-from-toronto
Copy link

Following as well - We're seeing stream events with eventhub occasionally not being delivered to our consumer grains as expected. Seems to have started when we upgraded from v3 to v8, and still happening on latest v9.

@egil
Copy link
Contributor

egil commented Jan 18, 2025

I am seeing the QueueCacheMissException as well (and StreamEventDeliveryFailureException too):

Orleans.Streams.QueueCacheMissException: Item not found in cache.  Requested: EventHubSequenceToken(EventHubOffset: , SequenceNumber: 16460467, EventIndex: 0), Low: EventHubSequenceToken(EventHubOffset: , SequenceNumber: 16473074, EventIndex: 0), High: EventHubSequenceToken(EventHubOffset: , SequenceNumber: 16475834, EventIndex: 0)
   at Orleans.Providers.Streams.Common.PooledQueueCache.SetCursor(Cursor cursor, StreamSequenceToken sequenceToken) in /_/src/Orleans.Streaming/Common/PooledCache/PooledQueueCache.cs:line 215
   at Orleans.Streaming.EventHubs.EventHubAdapterReceiver.GetCacheCursor(StreamId streamId, StreamSequenceToken token) in /_/src/Azure/Orleans.Streaming.EventHubs/Providers/Streams/EventHub/EventHubAdapterReceiver.cs:line 216
   at Orleans.Streams.PersistentStreamPullingAgent.RunConsumerCursor(StreamConsumerData consumerData) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 606
Orleans.Streams.StreamEventDeliveryFailureException: Stream provider failed to deliver an event.  StreamProvider:sessions  Stream:sessions/de43e344e8ac4abd8ec2680143b64542

(This is the stacktrace in the logs in aspire. It's the same as the message)

To help investigate, let me share my setup:

  • Stream source is Azure EventHub with external producer of events. EventHub has 4 partitions, and produces 25k events per hour.
  • Events are received using a custom data adapter.
  • Using .net 9 with 9.0.x version of all packages.
  • Running locally via Aspire with 4 replicas (silos)
  • There are about 5-10 exceptions every ten minutes. See screenshot below. These areQueueCacheMissException and StreamEventDeliveryFailureException exceptions.

Image

The silo configuration:

hostBuilder.AddKeyedAzureTableClient("clustering");
hostBuilder.AddKeyedAzureBlobClient("grain-state");
hostBuilder.UseOrleans(siloBuilder =>
{
    siloBuilder.Services.AddSingleton<SessionsEventAdapter>();
    siloBuilder.AddEventHubStreams(
        name: "sessions",
        (ISiloEventHubStreamConfigurator configurator) =>
        {
            configurator.ConfigureEventHub(builder => builder.Configure(options =>
            {
                options.ConfigureEventHubConnection(
                    GetCpmsEventHubFqdn(hostBuilder),
                    eventHubName: "sessions",
                    GetPricingEngineConsumerGroup(hostBuilder),
                    new DefaultAzureCredential(new DefaultAzureCredentialOptions { TenantId = TenantId }));
            }));

            configurator.ConfigurePartitionReceiver(configure =>
            {
                configure.Configure(options =>
                {
                    options.PrefetchCount = 100;
                });
            });

            configurator.UseDataAdapter((services, name) => services.GetRequiredService<SessionsEventAdapter>());
            configurator.UseAzureTableCheckpointer(
                builder => builder.Configure(options =>
                {
                    options.TableServiceClient = new TableServiceClient(
                        siloBuilder.Configuration.GetConnectionString("EventHubCheckpointStorage"),
                        options.ClientOptions);
                    options.PersistInterval = TimeSpan.FromSeconds(10);
                }));
            configurator.ConfigureStreamPubSub(StreamPubSubType.ImplicitOnly);
        });

    if (hostBuilder.Environment.IsDevelopment())
    {
        siloBuilder.UseDashboard(options =>
        {
            options.HostSelf = false;
        });
    }
});

// SessionsEventAdapter.cs
internal sealed class SessionsEventAdapter(Serializer serializer) : EventHubDataAdapter(serializer)
{
    public override string GetPartitionKey(StreamId streamId)
        => streamId.ToString();

    public override StreamId GetStreamIdentity(EventData queueMessage)
    {
        // When charging sessions are published, the sessions ID is passed as the partition key.
        // That means the Grain that should receive the charging session event is the one with the same ID.
        var sessionId = queueMessage.PartitionKey;
        return StreamId.Create(EventHubConstants.Sessions.EventHubName, sessionId);
    }

    public override EventData ToQueueMessage<T>(StreamId streamId, IEnumerable<T> events, StreamSequenceToken token, Dictionary<string, object> requestContext)
        => throw new NotSupportedException("This adapter only supports reading CPMS charging sessions.");

    protected override IBatchContainer GetBatchContainer(EventHubMessage eventHubMessage)
        => new SessionsBatchContainer(eventHubMessage);
}


// SessionsBatchContainer.cs
[GenerateSerializer, Immutable]
internal sealed class SessionsBatchContainer : IBatchContainer
{
    private static readonly JsonSerializerOptions SessionsJsonSerializerOptions = JsonSerializerOptionsFactory.Create();

    [Id(0)]
    private readonly EventHubMessage eventHubMessage;

    [Id(1)]
    public StreamSequenceToken SequenceToken { get; }

    public StreamId StreamId => eventHubMessage.StreamId;

    public SessionsBatchContainer(EventHubMessage eventHubMessage)
    {
        this.eventHubMessage = eventHubMessage;
        SequenceToken = new EventHubSequenceTokenV2(eventHubMessage.Offset, eventHubMessage.SequenceNumber, 0);
    }

    public IEnumerable<Tuple<T, StreamSequenceToken>> GetEvents<T>()
    {
        try
        {
            if (JsonSerializer.Deserialize<T>(eventHubMessage.Payload, SessionsJsonSerializerOptions) is { } message)
            {
                return [Tuple.Create(message, SequenceToken)];                
            }
        }
        catch (Exception)
        {
        }

        return [];
    }

    public bool ImportRequestContext() => false;
}

Here is my grain:

[ImplicitStreamSubscription("sessions")]
internal class ChargingSessionGrain(
    [PersistentState("ChargingSession")] IPersistentState<ChargingSession> session,
    ILogger<ChargingSessionGrain> logger) : Grain, IChargingSessionGrain, IStreamSubscriptionObserver, IAsyncObserver<SessionEvent>
{
    public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
    {
        var handler = handleFactory.Create<SessionEvent>();
        await handler.ResumeAsync(this, session.State.Token);
    }

    public async Task OnNextAsync(SessionEvent item, StreamSequenceToken? token = null)
    {
        if (session.State.SessionId is not null && session.State.SessionId != item.SessionId)
        {
            logger.LogError($"Receives a message for a different Session ID. Current = {session.State.SessionId}. Received = {item.SessionId}.");
            return;
        }

        session.State = session.State with
        {
            SessionId = item.SessionId,
            EvseId = item.EvseId,
            ConsumedKwh = item.ConsumedKwh,
            LastEvent = item.Type,
            LastEventTimestamp = item.Timestamp,
            EventCount = session.State.EventCount + 1,
            Token = token
        };

        await session.WriteStateAsync();
    }

    public Task OnCompletedAsync()
    {
        logger.LogInformation("Stream completed for {EvseId}.", session.State.EvseId);
        return Task.CompletedTask;
    }

    public Task OnErrorAsync(Exception ex)
    {
        logger.LogError(ex, "An error occurred while processing the stream.");
        return Task.CompletedTask;
    }
}

Telemetry collected over 12 hours:

orleans-streams-persistent-stream-messages-read: 228707
orleans-streams-persistent-stream-messages-sent: 228809
orleans-streams-persistent-stream-pubsub-cache-size: +-1780 (stable around that number)
orleans-streams-queue-cache-length: +-12200 (stable around that number)

Let me know if other stats are interesting.

I let it run locally on my computer and collected stats/logs via Aspire dashboard.

@benjaminpetit
Copy link
Member

I think they are different issues here.

I think the first analysis from @tchelidze make sense, and we may be skipping events when the metadata cache is used. We have different "persistent" provider that don't behave the same, causing this issue...

I also think in this thread there is issues cause by configuration mismatch; we should set some guide and some config check at runtime for that.

Metadata cache time > Streaming cache min time > Grain collection time (but if the grain is not called only by streaming, I wonder if it could trigger the issue you are encountering....)

@Costo, @egil could you open a separate issue(s)?

@egil
Copy link
Contributor

egil commented Jan 21, 2025

@benjaminpetit will do. @oising kindly provided input to my issue in Discord, and I definitely need more insights and understanding on how to configure streams for ingestion of events.

More guidance/docs and configuration validation would absolutely be great.

@Costo
Copy link
Contributor

Costo commented Jan 22, 2025

Metadata cache time > Streaming cache min time > Grain collection time (but if the grain is not called only by streaming, I wonder if it could trigger the issue you are encountering....)

In Orleans 3.x, grain collection time was 2 or 3 hours by default and the cache min time was 30 min. It did not cause messages to be dropped. The grain collection age was changed to 15mn in Orleans 7.0.
It's not possible to ensure "Streaming cache min time > Grain collection time" unless some grains are used for streaming only and don't have any method in their interface.

I'll create a separate issue if it helps.
Could we agree that Orleans Streams should never, in any circumstances, skip messages? I'm not talking about cache eviction or delivery failure. Just that new messages arrive and are immediately skipped.

@egil
Copy link
Contributor

egil commented Jan 22, 2025

Could we agree that Orleans Streams should never, in any circumstances, skip messages? I'm not talking about cache eviction or delivery failure. Just that new messages arrive and are immediately skipped.

💯

@andrew-from-toronto
Copy link

I think the first analysis from @tchelidze make sense, and we may be skipping events when the metadata cache is used. We have different "persistent" provider that don't behave the same, causing this issue...

Is the recommendation to disable metadata cache for now until this potential bug is fixed?

@Costo
Copy link
Contributor

Costo commented Jan 27, 2025

I created a new issue for a similar bug when MetadataMinTimeInCache is null: #9297
@benjaminpetit
@andrew-from-toronto

@Costo
Copy link
Contributor

Costo commented Jan 27, 2025

Hi @benjaminpetit,

I opened a separate issue for my Event Hubs streaming problem: #9299
I think it is the same issue as this one.

@ikkentim
Copy link
Contributor

This can be closed as resolved I think? (#9336)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants