Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messaging: v2 stack amqp-core types and integration to Service Bus #34854

Merged
merged 164 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
164 commits
Select commit Hold shift + click to select a range
6228bc0
Adding new stack (connection recovery and receiver) into amqp-core
anuchandy Mar 11, 2023
f0c817b
Unit tests for new amqp-core stack (connection recovery and receiver)
anuchandy Mar 11, 2023
776d817
Minimal change in EH to use new AmqpLinkProvider to create links
anuchandy Mar 11, 2023
74dd656
Initial integration of SB with new amqp-core stack (connection recove…
anuchandy Mar 11, 2023
648231d
Unit test for SB Sender Async Client recoveries (uses new connection …
anuchandy Mar 11, 2023
10a4037
Make the impl package type CreditFlowMode public.
anuchandy Mar 11, 2023
0accbf6
Using MessageFlux in AsyncReceiver when legacy stack is disabled
anuchandy Mar 11, 2023
8611a02
Adding nested private type in Builder to read and use the opt-out con…
anuchandy Mar 14, 2023
46af142
[amqp-core logging cleanup] Excluding null error condition and descri…
anuchandy Mar 25, 2023
5bbec2a
ReactorSession: Ensure it closes Link of type ReactorReceiver2
anuchandy Mar 25, 2023
1845281
Updating ServiceBusMessageSerializer to handle new MessageWithDeliver…
anuchandy Mar 25, 2023
4a8899d
MessageFlux: Removing trackingId from logs
anuchandy Mar 25, 2023
6a2fa1d
Rename AmqpReceiveLink::scheduleCredit(Supplier) to addCredit(Supplier)
anuchandy May 4, 2023
842cc6b
MessageFlux doc improvements, remove un-need return
anuchandy May 4, 2023
f861634
Enable sync receive on the new stack and make it opt-in
anuchandy May 4, 2023
9ceac11
NonSessionProcessor to pump from session-unaware entity (using new st…
anuchandy May 4, 2023
6df93c2
SB ManagementChannel: Directly log lock-token instead of logging it a…
anuchandy May 4, 2023
f7fc51c
Revert changes in ServiceBusProcessorClient did for testing
anuchandy May 4, 2023
bd070df
Correcting ServiceBusSenderAsyncClientTest to use connection-wrapper …
anuchandy May 8, 2023
340609e
Improve readability: Removing onEndpointTerminalState method - The ne…
anuchandy May 8, 2023
38dca25
Remove setEmptyCreditListener from v2 stack, the MessageFlux never us…
anuchandy May 9, 2023
2458609
address checkstyle warnings
anuchandy May 9, 2023
abb2785
Enable v2 stack in ReactorReceiver and ServiceBusReactorReceiver
anuchandy May 9, 2023
4022d92
Given ReactorReceiver and ServiceBusReactorReceiver now has v2 stack …
anuchandy May 9, 2023
5ce51e5
Use the term v1,v2 Stack consistently (remove all legacy,new stack te…
anuchandy May 9, 2023
a25ab3d
doc updates
anuchandy May 9, 2023
966358c
MessageFlux: Remove casting, improve redability by grouping variables…
anuchandy May 9, 2023
bcf99c6
Improve CreditAccounting java-doc by including thread-safety and visi…
anuchandy May 9, 2023
062048f
correct track2-perf compilation
anuchandy May 9, 2023
dc350af
Java-doc note on ISE in ReactorConnectionCache
anuchandy May 10, 2023
72712d0
java-doc update ReactorConnectionCache & ReactorReceiver
anuchandy May 11, 2023
cfd205d
Build NonSessionProcessor on v2 stack lower client
anuchandy May 11, 2023
1a4c4e4
import Instrumentation from impl package, remove unused import
anuchandy May 11, 2023
b5487aa
Stabilize NonSessionProcessor (V2), prepare for the integration of No…
anuchandy May 14, 2023
f54a954
Spotbug updates for the NonSessionProcessor type
anuchandy May 14, 2023
1a5fa88
Review feedback: Rename ConsumerSettings to ConsumerFactory
anuchandy May 14, 2023
42263a9
Review feedback: Removing unused ReceiveLinkHandler2::Constructor
anuchandy May 14, 2023
fb3755d
Review feedback: Rename CreditAccounting to CreditAccountingStrategy
anuchandy May 14, 2023
da27876
Review Feedback: log prefetch validation-error in CreditAccountingStr…
anuchandy May 16, 2023
a5126fd
Exposing connectionId in the internal AmqpReceiveLink contract.
anuchandy May 16, 2023
3ffdbda
Logging improvements: Log connectionId in MessageFlux code paths, cle…
anuchandy May 16, 2023
5cc4acd
Review Feedback: Using ConfigurationProperty to read application's op…
anuchandy May 16, 2023
a0c24f1
MessageFlux one-line log improvement
anuchandy May 17, 2023
4994e7b
NonSessionProcessor: Avoid thread hopping when max-concurrent-calls is 1
anuchandy May 17, 2023
92eb6a6
Wire V2 NonSessionProcessor to public Processor API., Scoping integra…
anuchandy May 17, 2023
ab03510
Enabling Non-Session Processor tests to test V2 in addition to V1 (Tr…
anuchandy May 17, 2023
a9c6e13
ServiceBusClientBuilder: Move the assigning default for retryOptions …
anuchandy May 18, 2023
00ac4fe
Better names for the internal types in NonSessionProcessor, scoping T…
anuchandy May 18, 2023
e68551f
NonSessionProcessor: Adding tracing for the message and enable tracin…
anuchandy May 18, 2023
1c8c8f6
Adding a test to MessageFluxIsolatedTest assert receiver closure via …
anuchandy May 18, 2023
0d45954
NonSessionProcessor: close the current pump's underlying client befor…
anuchandy May 18, 2023
1b7024c
Adding package internal beginIntern method to RollingMessagePump for …
anuchandy May 19, 2023
b7c90a7
Adding unit-tests for NonSessionProcessor.RollingMessagePump
anuchandy May 19, 2023
dd8ff2a
NonSessionProcessor: Lazy create pump completetion error
anuchandy May 20, 2023
f01c7fb
Unit test update: asserting user thrown error gets notified to error-…
anuchandy May 20, 2023
1aaf523
unit-tests cleanup for NonSessionProcessor.RollingMessagePump
anuchandy May 20, 2023
e775845
Trace lockrenew for NonSessionProcessor, nit test doc update.
anuchandy May 22, 2023
743ba30
Enable V2 testing for ServiceBusSenderAsyncClientTest
anuchandy May 22, 2023
4b4eba2
NonSessionProcessor: Removing the error check that is never applicabl…
anuchandy May 23, 2023
c746fd0
Enabling AutoDisposition|LockRenewal (deprecated) for LowLevel Client…
anuchandy May 23, 2023
31320fe
Enable ServiceBusReceiverAsyncClientTest unit tests for V2 as well
anuchandy May 23, 2023
8d8b2c4
Implementing caching for RequestResponseChannel. (e.g. cbs and manage…
anuchandy Jun 7, 2023
fac6df8
Adding tests for RequestResponseChannel caching.
anuchandy Jun 7, 2023
bc41273
review feedback on logging
anuchandy Jun 7, 2023
8be0acf
Spotbug: adding exclusion for return value handling of schedule in Re…
anuchandy Jun 26, 2023
16bbfa2
Adding common ReceiversPumpingScheduler for ReactorReceiver's message…
anuchandy Jun 26, 2023
fc552fc
Using ReceiversPumpingScheduler in ServiceBusReactorReceiver
anuchandy Jun 26, 2023
ddd9843
ReceiversPumpingScheduler JavaDoc cleanup
anuchandy Jun 27, 2023
bfd5f7f
Addressing the potential subscription leak in MessageFlux when the re…
anuchandy Jun 28, 2023
c0e3740
ReceiversPumpingScheduler invalid input handling, logging and better …
anuchandy Jun 29, 2023
da5025b
MessageFlux: remove one additional subscription to the endpoint state…
anuchandy Jun 29, 2023
ec6fd3e
Updating ReceiversPumpingScheduler to work correctly with Reactor tes…
anuchandy Jun 30, 2023
0f496b5
Avoid scheduling Non-Active events to the Scheduler those simply get …
anuchandy Jul 1, 2023
bc05539
Fix ServiceBusReceiverAsyncClientTest syntax error on rebase
anuchandy Jul 11, 2023
5e38d46
MessageFlux: Adding support to turn off retry hence propagating error…
anuchandy Jul 19, 2023
1be215a
ReceiverUnsettledDeliveries::sendDisposition(,): Update API to return…
anuchandy Jul 19, 2023
3dded4d
Types to support Service Bus session feature on V2 stack. It addresse…
anuchandy Jul 24, 2023
fe8f052
Simplify SessionsMessagePump.RollingSessionReceiver.SessionLinkStream…
anuchandy Jul 24, 2023
bb77a1b
SessionsMessagePump: JavaDoc and code redability improvments
anuchandy Jul 24, 2023
f9cdb00
SessionsMessagePump: Java Doc & minor code cleanup for checkstyle
anuchandy Jul 25, 2023
d0641e4
adding words unknown to cspell
anuchandy Jul 25, 2023
b5322a2
SessionsMessagePump: Document the termination flow
anuchandy Jul 27, 2023
2f608ec
Rename ServiceBusSessionLinkAcquirer to ServiceBusSessionAcquirer. Ha…
anuchandy Jul 29, 2023
ee1bdbc
Unit tests for SessionsMessagePump
anuchandy Jul 29, 2023
d8a0171
SessionsMessagePump: tests for auto-dispositions, minor JavaDoc update
anuchandy Jul 29, 2023
0403e56
MessionsMessagePump test for closeAsync
anuchandy Jul 30, 2023
ae2db78
Adding an API getSessionProperties to internal contract ServiceBusRec…
anuchandy Jul 31, 2023
16351b7
Remove the closeAsync() API from SessionsMessagePump. Given the type …
anuchandy Aug 2, 2023
c0af25a
Update to pick the changes (method renames, param types) made in the …
anuchandy Aug 3, 2023
8d9f2db
NonSessionProcessor: minor JavaDoc improvement.
anuchandy Aug 5, 2023
80a9ad7
SessionsMessagePump: Adding polling similar to NonSessionProcessor.Ro…
anuchandy Aug 5, 2023
6a8893f
ServiceBusClientBuilder: Adding private package method buildForProces…
anuchandy Aug 5, 2023
6449169
Adding new type SessionProcessor, the type to roll from one SessionsM…
anuchandy Aug 5, 2023
8245462
SessionsMessagePump: Expose fqdn, entity-path to ease logging in Sess…
anuchandy Aug 5, 2023
62644dd
Integrate SessionProcessor to ServiceBusProcessorClient
anuchandy Aug 5, 2023
d5a6234
Adding pumpId to identify SessionMessagePump instance, Updating Sessi…
anuchandy Aug 7, 2023
210b412
Have NonSessionProcessor consistent with SessionProcessor (interms of…
anuchandy Aug 7, 2023
1b243f2
Extracting out the inner class NonSessionProcessor.MessagePump
anuchandy Aug 7, 2023
dfa323d
Renaming MessagePump to NonSessionMessagePump (to consistent with nam…
anuchandy Aug 7, 2023
87bcc02
SessionsMessagePump: Adding error and completion map to TerminatedExc…
anuchandy Aug 7, 2023
92f9854
Extracting TerminatedException out of SessionsMessagePump and NonSess…
anuchandy Aug 7, 2023
9e6a8d3
Rename TerminatedException to MessagePumpTerminatedException
anuchandy Aug 7, 2023
b61da58
Moving pumpId const to ClientConstants, JavaDoc improvements
anuchandy Aug 8, 2023
9cd9fe3
Adding ServiceBusProcessor which merges the responsibilities of NonSe…
anuchandy Aug 8, 2023
211484a
Integrate ServiceBusProcessor (abstracting non-session and session pu…
anuchandy Aug 10, 2023
6f14330
Deleting SessionProcessor and NonSessionProcessor given the new Servi…
anuchandy Aug 10, 2023
a7f04d0
Minor JavaDoc corrections
anuchandy Aug 10, 2023
a01fe72
Enabling opt-in option to run v2 session processor
anuchandy Aug 14, 2023
af8de3f
Enabling Tracing for Service Bus V2 Processor receive, and Reactor re…
lmolkova Aug 17, 2023
8ed8679
Removing message flow via nop Function in ServiceBusReceiverInstrumen…
anuchandy Aug 22, 2023
e78c008
Removing the word 'roller' from cspell.json
anuchandy Aug 22, 2023
2c2b342
spotbug-exclude: remove the regex form, simply using class name is su…
anuchandy Aug 22, 2023
54df7c6
Renaming ReactorHandlerProvider::createReceiveLinkHandler2 to createR…
anuchandy Aug 23, 2023
982a538
ServiceBusReceiverInstrumentationTests: Scope the Null message tests …
anuchandy Aug 23, 2023
7608531
remove addressed todo in ServiceBusReceiverInstrumentationTests
anuchandy Aug 28, 2023
f950473
Update MessageFlux.updateDisposition API to emit DeliveryNotOnLinkExc…
anuchandy Aug 28, 2023
99c1326
Minor readability improvement in ServiceBusReceiverAsyncClient
anuchandy Aug 28, 2023
27ff613
Avoid renew-lock enforcing later disposition through management node.…
anuchandy Aug 28, 2023
71a4b8d
V2: Fallback to management node for non-session message disposition, …
anuchandy Aug 28, 2023
a476f5f
V1 ServiceBusSessionManager: Use ServiceBusConnectionProcessor throug…
anuchandy Aug 31, 2023
cf24214
V2 Session readability improvement: Adding beginLockRenew API in Serv…
anuchandy Aug 31, 2023
d9ec98a
V2 Session: Adding support for Session Reactor and Sync Receive on ne…
anuchandy Aug 31, 2023
7bea8f9
feedback: group MessageFlux members by visisbility and keyword
anuchandy Aug 31, 2023
88936d8
feedback: use fullQualifiedNamespace not fqdn, remove unneed logging
anuchandy Aug 31, 2023
73fc0fb
feedback: use fullQualifiedNamespace not fqdn, rename hostName to hot…
anuchandy Aug 31, 2023
b1d8e14
Update ServiceBusProcessorTest::testProcessorWithTracingEnabledAndNul…
anuchandy Sep 27, 2023
44b743e
Make ReactorConnectionCache::retryWhenSpec method package private to …
anuchandy Sep 29, 2023
ba1a40f
Add tests for ReactorConnectionCache::retryWhenSpec method.
anuchandy Sep 29, 2023
20c2d92
Adding a method to validate the required parameters at the time of bu…
anuchandy Oct 5, 2023
05caeeb
Adding JavaDoc for ReceiversPumpingScheduler#isVTSchedulerMode() priv…
anuchandy Oct 5, 2023
f1b615c
ReactorReceiver: Replacing .doOnNext with .map to track the last sequ…
anuchandy Oct 5, 2023
88736f7
Moving logging string keys in ReceiverDeliveryHandler to ClientConsta…
anuchandy Oct 5, 2023
9e842a2
Renaming NonSessionMessagePump to MessagePump (Non session nature of …
anuchandy Oct 5, 2023
6533a79
Simplify ServiceBusAsyncConsumer: remove isDisposed member, since lin…
anuchandy Oct 5, 2023
c98a7ef
Move ReceiveLinkHandlerWrapper outside of ReactorReceiver, rename Req…
anuchandy Oct 5, 2023
04d80f6
remove use of 'this.' from the code sections if readability is not af…
anuchandy Oct 6, 2023
4ac4ccd
ReceiversPumpingScheduler: Using static ClientLogger instead of creat…
anuchandy Oct 31, 2023
ed91f64
ReceiverUnsettledDeliveries: Using DELIVERY_KEY, LINK_NAME_KEY for lo…
anuchandy Oct 31, 2023
55778a7
ReceiverDeliveryHandler: improve logOnDelivery private method readabi…
anuchandy Oct 31, 2023
382943a
MessageFlux: Defining UpdateDisposition contract to improve the reada…
anuchandy Oct 31, 2023
0b35f30
IsolatedTests: Adding JavaDoc indicating reason for these tests to ru…
anuchandy Oct 31, 2023
5bf8c6d
ReceiverUnsettledDeliveriesTest: Making logger and retryOptions static.
anuchandy Oct 31, 2023
19a6c4c
ReceiverUnsettledDeliveriesTest: Adding about logic in sendDispositio…
anuchandy Oct 31, 2023
405cb97
MessageFlux[EmissionDriven|RequestDriven]CreditFlowIsolatedTest: Usin…
anuchandy Oct 31, 2023
1088d93
MessageFlux[Isolated]Test: Using methodSource to provide common Credi…
anuchandy Oct 31, 2023
5b2d98a
Defining call-site as a constant and logging it through key-value.
anuchandy Oct 31, 2023
354436d
MessageFluxTest: Using static Atleast(n) instead of newing up it for …
anuchandy Oct 31, 2023
28eca55
ReactorConnectionCache: JavaDoc improvements, rename closeSignal() to…
anuchandy Oct 31, 2023
f961c7f
ReactorReceiver: Given the meter cannot be enabled dynamically, probe…
anuchandy Oct 31, 2023
0206472
MessageFlux: Removing the doOnEach operator applied to endpoint state…
anuchandy Nov 1, 2023
fadd589
ReactorReceiver: (unrelated to v2) remove the verbose log about no cr…
anuchandy Nov 1, 2023
fcb2cf1
Using latest unreleased versions of core libs
anuchandy Nov 9, 2023
bc1e3c1
Improving opt-in and opt-out of stack.
anuchandy Nov 10, 2023
73d4db2
ServiceBusAmqpLinkProvider: format code to wrap lines at 120 chars.
anuchandy Nov 10, 2023
f10b2b8
ClientConstants: consistently use camelCase of logging key values.
anuchandy Nov 10, 2023
115b0f5
ServiceBusSenderAsyncClientTest, ServiceBusSenderAsyncClientRecoveryI…
anuchandy Nov 10, 2023
cdf5253
azure-core-amqp_src: Removing JavaDoc unsupported self closing paragr…
anuchandy Nov 21, 2023
c1dd840
azure-core-amqp_test: Removing JavaDoc unsupported self closing parag…
anuchandy Nov 21, 2023
fa7d1f8
[Sessions]MessagePump: Improving RunOnWorker type to avoid using then…
anuchandy Nov 21, 2023
7238b1f
ServiceBusReactorSession,ServiceBusProcessorClientOptions: Removing u…
anuchandy Nov 21, 2023
4412033
ServiceBus[Processor|ReceiverAsyncClient]Test: Using ValueSource inst…
anuchandy Nov 21, 2023
9d0fae2
ServiceBusReceiverClient: Replacing updateAndGet in Sync-Receive with…
anuchandy Nov 21, 2023
791fdd6
Service Bus Batch Send: Avoiding pre-allocation of byte array (with s…
anuchandy Nov 22, 2023
efd3d99
Using latest unreleased versions of core libs
anuchandy Nov 22, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .vscode/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,10 @@
{
"filename": "sdk/servicebus/azure-messaging-servicebus/**",
"words": [
"anu",
"Conniey",
"liudmila",
"Milli",
"qpid",
"unretriable"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ the main ServiceBusClientBuilder. -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="ServiceBusSessionReceiver(Async)?Client.java"/>

<suppress checks="com.azure.tools.checkstyle.checks.UseCaughtExceptionCauseCheck" files="ServiceBusMessageBatch.java"/>
<!-- Supress the BufferOverflowException thrown from encodedSize method, this exception is caught, mapped and logged at the callsite -->
<suppress checks="com.azure.tools.checkstyle.checks.ThrowFromClientLoggerCheck" files="com.azure.messaging.servicebus.ServiceBusMessageBatch.java"/>

<!-- Suppress the long package name in autoconfigure.implementation.redis package -->
<suppress checks="PackageName" files="com.azure.spring.cloud.autoconfigure.implementation.redis.*" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2436,6 +2436,18 @@
<Bug pattern="SE_BAD_FIELD"/>
</Match>

<!-- The returned Disposable can be ignored since Scheduler is closed when Reactor closes. -->
<Match>
<Class name="com.azure.core.amqp.implementation.ReactorExecutor"/>
<Method name="start"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
</Match>
<Match>
<Class name="com.azure.core.amqp.implementation.ReactorExecutor"/>
<Method name="run"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
</Match>

<!-- For BinaryData, copying array contents degrades performance. The data returned as byte array from BinaryData is expected to be copied by the call if mutability is desired. -->
<Match>
<Class name="com.azure.core.implementation.util.ByteArrayContent"/>
Expand Down Expand Up @@ -2713,5 +2725,115 @@
<Bug pattern="BC_UNCONFIRMED_CAST_OF_RETURN_VALUE"/>
</Match>

<!-- Field is not initialized in Ctr but necessary safety measures are in place when accessing it -->
<Match>
<Class name="com.azure.messaging.servicebus.NonSessionProcessor"/>
<Field name="rollingMessagePump"/>
<Bug pattern="UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"/>
</Match>
<Match>
<Class name="com.azure.messaging.servicebus.SessionProcessor"/>
<Field name="rollingSessionsMessagePump"/>
<Bug pattern="UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"/>
</Match>
<Match>
<Class name="com.azure.messaging.servicebus.ServiceBusProcessor"/>
<Field name="rollingMessagePump"/>
<Bug pattern="UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"/>
</Match>
<!-- False positives -->
<Match>
<Class name="com.azure.messaging.servicebus.NonSessionProcessor$RollingNonSessionMessagePump"/>
<Or>
<Field name="builder"/>
<Field name="logger"/>
<Field name="disposable"/>
</Or>
<Bug pattern="SE_BAD_FIELD"/>
</Match>
<Match>
<Class name="com.azure.messaging.servicebus.SessionProcessor$RollingSessionsMessagePump"/>
<Or>
<Field name="builder"/>
<Field name="logger"/>
<Field name="disposable"/>
</Or>
<Bug pattern="SE_BAD_FIELD"/>
</Match>
<Match>
<Class name="com.azure.messaging.servicebus.ServiceBusProcessor$RollingMessagePump"/>
<Or>
<Field name="nonSessionBuilder"/>
<Field name="sessionBuilder"/>
<Field name="logger"/>
<Field name="disposable"/>
</Or>
<Bug pattern="SE_BAD_FIELD"/>
</Match>
<!-- The RollingNonSessionMessagePump and RollingSessionsMessagePump doesn't need to be serializable -->
<Match>
<Class
name="com.azure.messaging.servicebus.NonSessionProcessor$RollingNonSessionMessagePump"/>
<Bug pattern="SE_NO_SERIALVERSIONID"/>
</Match>
<Match>
<Class
name="com.azure.messaging.servicebus.SessionProcessor$RollingSessionsMessagePump"/>
<Bug pattern="SE_NO_SERIALVERSIONID"/>
</Match>
<Match>
<Class
name="com.azure.messaging.servicebus.ServiceBusProcessor$RollingMessagePump"/>
<Bug pattern="SE_NO_SERIALVERSIONID"/>
</Match>
<!-- False positives -->
<Match>
<Class
name="com.azure.messaging.servicebus.SessionsMessagePump$RollingSessionReceiver"/>
<Or>
<Field name="instrumentation"/>
<Field name="logger"/>
<Field name="messageFlux"/>
<Field name="receiversTracker"/>
<Field name="tracer"/>
<Field name="managementNode"/>
<Field name="serializer"/>
<Field name="workerScheduler"/>
</Or>
<Bug pattern="SE_BAD_FIELD"/>
</Match>
<Match>
<Class
name="com.azure.messaging.servicebus.SessionsMessagePump$RollingSessionReceiver$NextSessionStream"/>
<Or>
<Field name="newSession"/>
</Or>
<Bug pattern="SE_BAD_FIELD"/>
</Match>
<Match>
<Class
name="com.azure.messaging.servicebus.ServiceBusSingleSessionManager$SessionReceiverStream"/>
<Or>
<Field name="sessionReceiver"/>
</Or>
<Bug pattern="SE_BAD_FIELD"/>
</Match>
<!-- The RollingSessionReceiver, NextSessionStream, $SessionReceiverStream doesn't need to be serializable -->
<Match>
<Class
name="com.azure.messaging.servicebus.SessionsMessagePump$RollingSessionReceiver"/>
<Bug pattern="SE_NO_SERIALVERSIONID"/>
</Match>
<Match>
<Class
name="com.azure.messaging.servicebus.SessionsMessagePump$RollingSessionReceiver$NextSessionStream"/>
<Bug pattern="SE_NO_SERIALVERSIONID"/>
</Match>
<Match>
<Class
name="com.azure.messaging.servicebus.ServiceBusSingleSessionManager$SessionReceiverStream"/>
<Bug pattern="SE_NO_SERIALVERSIONID"/>
</Match>

<!-- Comment to force CI to run Spotbugs against all SDKs -->
</FindBugsFilter>
3 changes: 3 additions & 0 deletions eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,9 @@ com.azure.tools:azure-sdk-build-tool;1.0.0;1.1.0-beta.1
# In the pom, the version update tag after the version should name the unreleased package and the dependency version:
# <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->

unreleased_com.azure:azure-core;1.46.0-beta.1
unreleased_com.azure:azure-core-amqp;2.9.0-beta.8

# Released Beta dependencies: Copy the entry from above, prepend "beta_", remove the current
# version and set the version to the released beta. Released beta dependencies are only valid
# for dependency versions. These entries are specifically for when we've released a beta for
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.46.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core;current} -->
<version>1.46.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import reactor.core.scheduler.Scheduler;

/**
* Provides links for send and receive.
*
* Under normal execution, the provider provides the actual links, but when running under unit test scenarios,
* the provider enables tests to inject mock links.
* @see ReactorProvider
* @see ReactorHandlerProvider
* @see AmqpMetricsProvider
*/
public class AmqpLinkProvider {
/**
* Creates an Amqp Link to send messages.
*
* @param amqpConnection The connection to host the Amqp Link.
* @param entityPath The message broker address for the sender.
* @param sender The underlying QPID sender.
* @param handler The QPID handler associated with the QPID sender.
* @param reactorProvider The provider for QPID Reactor and Reactor Executor.
* @param tokenManager Token manager for authorising with the CBS node.
* @param messageSerializer the Serializer to deserialize and serialize AMQP messages.
* @param retryOptions The Retry options.
* @param scheduler The scheduler to timeout the send operations those are not acknowledged by the broker.
* @param metricsProvider The metric provider (e.g. to record operations such as send).
* @return An Amqp Link.
*/
public AmqpSendLink createSendLink(AmqpConnection amqpConnection, String entityPath, Sender sender, SendLinkHandler handler,
ReactorProvider reactorProvider, TokenManager tokenManager, MessageSerializer messageSerializer,
AmqpRetryOptions retryOptions, Scheduler scheduler, AmqpMetricsProvider metricsProvider) {
return new ReactorSender(amqpConnection, entityPath, sender, handler, reactorProvider, tokenManager,
messageSerializer, retryOptions, scheduler, metricsProvider);
}

/**
* Creates an Amqp Link to receive messages.
*
* @param amqpConnection The connection to host the Amqp Link.
* @param entityPath The message broker address for the receiver.
* @param receiver The underlying QPID receiver.
* @param handler The QPID handler associated with the QPID receiver.
* @param tokenManager Token manager for authorising with the CBS node.
* @param dispatcher The dispatcher to schedule work to QPID Reactor Executor.
* @param retryOptions The Retry options.
* @param metricsProvider The metric provider (e.g. to record operations such as sending flow).
* @return An Amqp Link.
*/
public AmqpReceiveLink createReceiveLink(AmqpConnection amqpConnection, String entityPath, Receiver receiver,
ReceiveLinkHandler handler, TokenManager tokenManager, ReactorDispatcher dispatcher, AmqpRetryOptions retryOptions,
AmqpMetricsProvider metricsProvider) {
return new ReactorReceiver(amqpConnection, entityPath, receiver, new ReceiveLinkHandlerWrapper(handler), tokenManager, dispatcher, retryOptions,
metricsProvider);
}

// Note: ReceiveLinkHandler2 will become the ReceiveLinkHandler once the side by side support for v1 and v2 stack
// is removed. At that point "ReceiveLinkHandlerWrapper" and this createReceiveLink method will also be removed.
public AmqpReceiveLink createReceiveLink(AmqpConnection amqpConnection, String entityPath, Receiver receiver,
ReceiveLinkHandler2 handler, TokenManager tokenManager, ReactorDispatcher dispatcher,
AmqpRetryOptions retryOptions, AmqpMetricsProvider metricsProvider) {
return new ReactorReceiver(amqpConnection, entityPath, receiver, new ReceiveLinkHandlerWrapper(handler), tokenManager, dispatcher, retryOptions,
metricsProvider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static LoggingEventBuilder addSignalTypeAndResult(LoggingEventBuilder log
* and {@code getDescription()} under {@code errorDescription} keys.
*
* If errorCondition is {@code null} does not add properties.

*
* @return updated {@link LoggingEventBuilder} for chaining.
*/
public static LoggingEventBuilder addErrorCondition(LoggingEventBuilder logBuilder, ErrorCondition errorCondition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpLink;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.UncheckedIOException;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Supplier;

/**
Expand All @@ -18,6 +20,14 @@
* Specification 1.0: Links</a>
*/
public interface AmqpReceiveLink extends AmqpLink {
/**
* Gets the unique identifier of the Amqp connection hosting the receive link.
*
* @return The connection identifier.
*/
// Note: Ideally, we may expose connectionId in AmqpLink, but given it's a public contract, lets not do that until a use case needing it.
String getConnectionId();

/**
* Initialises the link from the client to the message broker and begins to receive messages from the broker.
*
Expand All @@ -26,6 +36,16 @@ public interface AmqpReceiveLink extends AmqpLink {
*/
Flux<Message> receive();

/**
* Updates the disposition state of a message uniquely identified by the given delivery tag.
*
* @param deliveryTag delivery tag of message.
* @param deliveryState Delivery state of message.
*
* @return A Mono that completes when the state is successfully updated and acknowledged by message broker.
*/
Mono<Void> updateDisposition(String deliveryTag, DeliveryState deliveryState);

/**
* Schedule to adds the specified number of credits to the link.
*
Expand All @@ -39,6 +59,17 @@ public interface AmqpReceiveLink extends AmqpLink {
*/
Mono<Void> addCredits(int credits);

/**
* Schedules an event to send a credit to the broker. The API takes a {@link Supplier} that returns the credit
* to send. The supplier allows providing the most up-to-date credit value when the scheduler picks the scheduled
* work for execution rather than the credit at the time of scheduling.
*
* @param creditSupplier the supplier that returns the credit to send.
* @throws RejectedExecutionException if the scheduler rejects the scheduling attempt (e.g., the scheduler is closed).
* @throws UncheckedIOException if an IO error occurs when scheduling.
*/
void addCredit(Supplier<Long> creditSupplier);

/**
* Gets the current number of credits this link has.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,25 @@ public final class ClientConstants {
public static final String LINK_NAME_KEY = "linkName";
public static final String ENTITY_PATH_KEY = "entityPath";
public static final String ENTITY_NAME_KEY = "entityName";
public static final String UPDATED_LINK_CREDIT_KEY = "updatedLinkCredit";
public static final String REMOTE_CREDIT_KEY = "remoteCredit";
public static final String IS_PARTIAL_DELIVERY_KEY = "delivery.isPartial";
public static final String IS_SETTLED_DELIVERY_KEY = "delivery.isSettled";
public static final String SESSION_NAME_KEY = "sessionName";
public static final String FULLY_QUALIFIED_NAMESPACE_KEY = "namespace";
public static final String OPERATION_NAME_KEY = "amqpOperation";
public static final String DELIVERY_KEY = "delivery";
public static final String DELIVERY_STATE_KEY = "deliveryState";
public static final String DELIVERY_TAG_KEY = "lockToken";
public static final String ERROR_CONDITION_KEY = "errorCondition";
public static final String ERROR_DESCRIPTION_KEY = "errorDescription";
public static final String EMIT_RESULT_KEY = "emitResult";
public static final String SIGNAL_TYPE_KEY = "signalType";
public static final String HOSTNAME_KEY = "hostName";
public static final String INTERVAL_KEY = "interval_ms";
public static final String INTERVAL_KEY = "intervalMs";
public static final String SUBSCRIBER_ID_KEY = "subscriberId";
public static final String PUMP_ID_KEY = "pumpId";
public static final String CALL_SITE_KEY = "callSite";

/**
* The default maximum allowable size, in bytes, for a batch to be sent.
Expand Down
Loading