-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding safety to Amqp Session operations, SessionCache to cache ReactorSession instances and integrating RequestResponseChannelCache #39107
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
API change check API changes are not detected in this pull request. |
2b1b3df
to
7fa63ae
Compare
28c5ed6
to
986ffd0
Compare
8dae9a2
to
433ef08
Compare
3d3e574
to
cd6ecec
Compare
cd6ecec
to
6a688ad
Compare
lmolkova
approved these changes
May 31, 2024
sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ProtonSession.java
Outdated
Show resolved
Hide resolved
224d080
to
28cec5b
Compare
anuchandy
commented
May 31, 2024
...-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannelCache.java
Show resolved
Hide resolved
sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java
Show resolved
Hide resolved
28cec5b
to
838aafd
Compare
conniey
approved these changes
Jun 3, 2024
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments. THank you!
sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java
Outdated
Show resolved
Hide resolved
...re/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSessionCache.java
Show resolved
Hide resolved
...zure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionCacheTest.java
Outdated
Show resolved
Hide resolved
...zure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionCacheTest.java
Show resolved
Hide resolved
838aafd
to
3767e09
Compare
0ef597c
to
155a3b9
Compare
155a3b9
to
7d63eb0
Compare
da01ad4
to
f23b2d8
Compare
/azp run java - servicebus - ci |
Azure Pipelines successfully started running 1 pipeline(s). |
fa35aac
to
33dd3ee
Compare
…orSession instances and integrating RequestResponseChannelCache
33dd3ee
to
fe39052
Compare
This was referenced Sep 11, 2024
[BUG] NullPointerException in Apache Qpid Proton-J during Azure Service Bus AMQP Reconnection
#41865
Closed
jairmyree
pushed a commit
to jairmyree/azure-sdk-for-java
that referenced
this pull request
Sep 23, 2024
…orSession instances and integrating RequestResponseChannelCache (Azure#39107)
3 tasks
3 tasks
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR has the following changes –
ReactorSessionCache
At the moment, we "create and open" the Qpid AMQP session outside of the Reactor Executor. We need a design to address this, additionally given we cache the sessions, we need to ensure 3 things -
A new implementation type "ReactorSessionCache" has been introduced to azure-core-amqp, to improve the cache functionalities readable, testable and to abstract the above 3 goals.
The "ReactorConnection" type composes this "ReactorSessionCache".
ProtonSession
Currently the ReactorSession "open" the internal Qpid session in the ReactorSession::Ctr, which potentially happens outside Reactor Executor (discussed in the previous section). Additionally, the RequestResponseChannel direct access the internal Qpid session that ReactorSession composes and "create and open" Sender and Receiver on it, this "create and open" by RequestResponseChannel is also not guaranteed to always happen in Reactor Executor.
A new implementation type "ProtonSession" has been introduced to abstract away direct operations in "Qpid session" that address the above safety concerns, the ReactorSession will contain an instance of "ProtonSession" and won’t expose it outside. Since the session can be accessed (not modified) from cache outside of Reactor Executor but can also concurrently modified, e.g., disposed from Reactor Executor, the "ProtonSession" stores the internal Qpid session in atomic reference. Keeping all lower-level operations in "ProtonSession" also makes it independently testable.
Session IO operations are never in the hot path, but at the time of connection shutdown and recovery, multiple threads can race for session, so safety is important here.
ProtonSession.ProtonChannel
Nested type of ProtonSession holding a "sender and receiver links pair" for bi-directional communication. The RequestResponseChannel will use this type to facilitate communications for cbs and management.
RequestResponseChannelCache
We already had "RequestResponseChannelCache" that follows the design principles of V2 "ReactorConnectionCache", but this type was not wired. These changes integrate this new cache into the execution flow.
Temporary wrapper types
As usual, 2 temporary wrapper types are added for side-by-side V1, V2, which will be deleted upon V1 removal.
opt-in flag
For the piloting phase, an opt-in flag has been added "com.azure.core.amqp.cache", this enables two cache routes "ReactorSessionCache" and "RequestResponseChannelCache".
These caches are enabled by default when application opt-in for Event Hubs V2 stack (PR) via "com.azure.messaging.eventhubs.v2".
For Service Bus, these caches require "com.azure.core.amqp.cache" opt-in. Given the heavy use of Request Response channels in Service Bus, this is undergoing additional testing.