Skip to content

Commit

Permalink
Merge branch 'RequestResponseWorkspace' into RequestResponseBinding
Browse files Browse the repository at this point in the history
  • Loading branch information
Bret Ambrose committed Jan 14, 2025
2 parents 75abbab + 9e2f0f4 commit 0713cad
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 230 deletions.
467 changes: 270 additions & 197 deletions .github/workflows/ci.yml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crt/aws-c-common
2 changes: 1 addition & 1 deletion crt/s2n
Submodule s2n updated from 493b77 to 2e79e7
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public long getConnectionAcquisitionTimeoutInMilliseconds() {
* @param connectionAcquisitionTimeoutInMilliseconds timeout in milliseconds.
* @return this
*/
public HttpClientConnectionManagerOptions withConnectionAcquisitionTimeoutInMilliseconds(int connectionAcquisitionTimeoutInMilliseconds) {
public HttpClientConnectionManagerOptions withConnectionAcquisitionTimeoutInMilliseconds(long connectionAcquisitionTimeoutInMilliseconds) {
this.connectionAcquisitionTimeoutInMilliseconds = connectionAcquisitionTimeoutInMilliseconds;
return this;
}
Expand All @@ -325,7 +325,7 @@ public long getMaxPendingConnectionAcquisitions() {
* @param maxPendingConnectionAcquisitions maximum pending acquisitions allowed
* @return this
*/
public HttpClientConnectionManagerOptions withMaxPendingConnectionAcquisitions(int maxPendingConnectionAcquisitions) {
public HttpClientConnectionManagerOptions withMaxPendingConnectionAcquisitions(long maxPendingConnectionAcquisitions) {
this.maxPendingConnectionAcquisitions = maxPendingConnectionAcquisitions;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public boolean getValidateChecksum() {
* The list of algorithms for user to pick up when validate the checksum. Client
* will pick up the algorithm from the list with the priority based on
* performance, and the algorithm sent by server. The priority based on
* performance is [CRC32C, CRC32, SHA1, SHA256].
* performance is [CRC64NVME, CRC32C, CRC32, SHA1, SHA256].
*
* If the response checksum was validated by client, the result will indicate
* which algorithm was picked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1132,9 +1132,10 @@ public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn on

@Override
public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) {
connectedFuture.completeExceptionally(new Exception(
"[" + client_name + "] Could not connect! Error code is: " + onConnectionFailureReturn.getErrorCode()
));
// failing the connected future here is not valid from a race condition standpoint. It is possible that
// the interrupting client itself gets interrupted and fails to fully connect due to the original client
// interrupting it. Eventually it will succeed (briefly) as the two clients fight over the client id
// with increasing reconnect backoff.
}

@Override
Expand Down Expand Up @@ -2083,6 +2084,9 @@ public void Op_UC4() {
eventsTwo.connectedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
subscriber.subscribe(subscribePacketBuilder.build()).get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);

// Paranoid about service-side eventual consistency. Add a wait to reduce chances of a missed will publish.
Thread.sleep(2000);

publisher.stop(disconnectPacketBuilder.build());

publishEvents.publishReceivedFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);
Expand Down Expand Up @@ -2171,11 +2175,6 @@ public void Op_SharedSubscription() {
// Wait a little longer just to ensure that no packets beyond expectations are arrived.
publishEvents.afterCompletionFuture.get(OPERATION_TIMEOUT_TIME, TimeUnit.SECONDS);

// Check that both clients received packets.
// PublishEvents_Futured_Counted also checks for duplicated packets, so this one assert is enough
// to ensure that AWS IoT Core sent different packets to different subscribers.
assertTrue(publishEvents.clientsReceived.size() == 2);

subscriberOneClient.stop();
subscriberTwoClient.stop();
publisherClient.stop();
Expand Down
19 changes: 0 additions & 19 deletions src/test/java/software/amazon/awssdk/crt/test/SelfPubSubTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ public SelfPubSubTest() {
static final String TEST_TOPIC = "publish/me/senpai/" + UUID.randomUUID().toString();
static final String TEST_PAYLOAD = "PUBLISH ME! SHINY AND CHROME!";

int pubsAcked = 0;
int subsAcked = 0;

@Test
public void testPubSub() {
skipIfNetworkUnavailable();
Expand Down Expand Up @@ -65,27 +62,21 @@ public void testPubSub() {

CompletableFuture<Integer> subscribed = connection.subscribe(TEST_TOPIC, QualityOfService.AT_LEAST_ONCE,
messageHandler);
subscribed.thenApply(unused -> subsAcked++);
int packetId = subscribed.get();

assertNotSame(0, packetId);
assertEquals("Single subscription", 1, subsAcked);

MqttMessage message = new MqttMessage(TEST_TOPIC, TEST_PAYLOAD.getBytes(), QualityOfService.AT_LEAST_ONCE,
false);
CompletableFuture<Integer> published = connection.publish(message);
published.thenApply(unused -> pubsAcked++);
packetId = published.get();

assertNotSame(0, packetId);
assertEquals("Published", 1, pubsAcked);

published = connection.publish(message);
published.thenApply(unused -> pubsAcked++);
packetId = published.get();

assertNotSame(0, packetId);
assertEquals("Published", 2, pubsAcked);

MqttMessage received = receivedFuture.get();
assertEquals("Received", message.getTopic(), received.getTopic());
Expand All @@ -94,11 +85,9 @@ public void testPubSub() {
assertEquals("Received", message.getRetain(), received.getRetain());

CompletableFuture<Integer> unsubscribed = connection.unsubscribe(TEST_TOPIC);
unsubscribed.thenApply(unused -> subsAcked--);
packetId = unsubscribed.get();

assertNotSame(0, packetId);
assertEquals("No Subscriptions", 0, subsAcked);
} catch (Exception ex) {
fail(ex.getMessage());
}
Expand Down Expand Up @@ -142,33 +131,25 @@ public void testPubSubOnMessage() {

try {
CompletableFuture<Integer> subscribed = connection.subscribe(TEST_TOPIC, QualityOfService.AT_LEAST_ONCE);
subscribed.thenApply(unused -> subsAcked++);
int packetId = subscribed.get();

assertNotSame(0, packetId);
assertEquals("Single subscription", 1, subsAcked);

MqttMessage message = new MqttMessage(TEST_TOPIC, TEST_PAYLOAD.getBytes(), QualityOfService.AT_LEAST_ONCE);
CompletableFuture<Integer> published = connection.publish(message);
published.thenApply(unused -> pubsAcked++);
packetId = published.get();

assertNotSame(0, packetId);
assertEquals("Published", 1, pubsAcked);

published = connection.publish(message);
published.thenApply(unused -> pubsAcked++);
packetId = published.get();

assertNotSame(0, packetId);
assertEquals("Published", 2, pubsAcked);

CompletableFuture<Integer> unsubscribed = connection.unsubscribe(TEST_TOPIC);
unsubscribed.thenApply(unused -> subsAcked--);
packetId = unsubscribed.get();

assertNotSame(0, packetId);
assertEquals("No Subscriptions", 0, subsAcked);
} catch (Exception ex) {
fail(ex.getMessage());
}
Expand Down

0 comments on commit 0713cad

Please sign in to comment.