Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition when posting CronetMetrics #2191

Merged
merged 9 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions library/java/org/chromium/net/impl/AtomicCombinatoryState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.chromium.net.impl;

import java.util.concurrent.atomic.AtomicInteger;

/**
* "Compare And Swap" logic based class providing a mean to ensure that the last awaited
* "stateEvent" will be identified as so. Typically a "stateEvent" is a single bit flip.
*
* <p>This class is Thread Safe.
*/
final class AtomicCombinatoryState {

private final int mFinalState;
private final AtomicInteger mState = new AtomicInteger(0);

/**
* finalState must be a power of two minus one: 1, 3, 7, 15, ...
*/
AtomicCombinatoryState(int finalState) {
assert finalState > 0 && ((finalState + 1) & finalState) == 0;
this.mFinalState = finalState;
}

/**
* Returns true if the state reaches, for the first time, the final state. The provided stateEvent
* is atmomically ORed with the current state - the outcome is saved as the new state.
*/
boolean hasReachedFinalState(int stateEvent) {
assert stateEvent <= mFinalState;
while (true) {
int originalState = mState.get();
int updatedState = originalState | stateEvent;
if (mState.compareAndSet(originalState, updatedState)) {
return originalState != mFinalState && updatedState == mFinalState;
}
}
}
}
1 change: 1 addition & 0 deletions library/java/org/chromium/net/impl/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ android_library(
name = "cronvoy",
srcs = [
"Annotations.java",
"AtomicCombinatoryState.java",
"BidirectionalStreamBuilderImpl.java",
"CallbackExceptionImpl.java",
"CronetEngineBase.java",
Expand Down
141 changes: 83 additions & 58 deletions library/java/org/chromium/net/impl/CronetUrlRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,32 @@ public final class CronetUrlRequest extends UrlRequestBase {

@IntDef({CancelState.READY, CancelState.BUSY, CancelState.CANCELLED})
@Retention(RetentionPolicy.SOURCE)
@interface CancelState {
private @interface CancelState {
int READY = 0;
int BUSY = 1;
int CANCELLED = 2;
}

@IntDef({SucceededState.UNDETERMINED, SucceededState.FINAL_READ_DONE,
SucceededState.ON_COMPLETE_RECEIVED, SucceededState.SUCCESS_READY})
@IntDef(flag = true, // This is a bitmap.
value = {SucceededState.UNDETERMINED, SucceededState.FINAL_READ_DONE,
SucceededState.ON_COMPLETE_RECEIVED, SucceededState.SUCCESS_READY})
@Retention(RetentionPolicy.SOURCE)
@interface SucceededState {
int UNDETERMINED = 0;
int FINAL_READ_DONE = 1;
int ON_COMPLETE_RECEIVED = 2;
int SUCCESS_READY = 3;
private @interface SucceededState {
int UNDETERMINED = 0b00;
int FINAL_READ_DONE = 0b01;
int ON_COMPLETE_RECEIVED = 0b10;
int SUCCESS_READY = FINAL_READ_DONE | ON_COMPLETE_RECEIVED;
}

@IntDef(flag = true, // This is a bitmap.
value = {ReportState.INITIAL_STATE, ReportState.USER_FINAL_CALLBACK_DONE,
ReportState.NETWORK_FINAL_CALLBACK_RECEIVED, ReportState.REPORT_READY})
@Retention(RetentionPolicy.SOURCE)
private @interface ReportState {
int INITIAL_STATE = 0b00;
int USER_FINAL_CALLBACK_DONE = 0b01;
int NETWORK_FINAL_CALLBACK_RECEIVED = 0b10;
int REPORT_READY = USER_FINAL_CALLBACK_DONE | NETWORK_FINAL_CALLBACK_RECEIVED;
}

private static final String X_ENVOY = "x-envoy";
Expand Down Expand Up @@ -107,6 +119,28 @@ public final class CronetUrlRequest extends UrlRequestBase {
* exception), or cancellation.
*/
private final AtomicInteger mState = new AtomicInteger(State.NOT_STARTED);
/**
* Ensures that the "mCallback.onSucceeded" callback will be invoked after observing the required
* events, and that it will be done only once.
*
* <p>At the end of a successful request, "mCallback.onSucceeded" is invoked. Before doing so,
* two events must have occurred first: the "completion of the final read" and the "onComplete
* Network callback". The Thread involved with the last of these two events is in charge of the
* registering the task to execute "mCallback.onSucceeded" - this is intrinsically racy.
*/
private final AtomicCombinatoryState mSucceededState =
new AtomicCombinatoryState(SucceededState.SUCCESS_READY);
/**
* Ensures that the CronetMetrics will be posted after observing the required events, and that it
* will be done only once.
*
* <p>At the end of a request, mRequestFinishedListener is used to post the CronetMetrics. Before
* doing so, two events must have occurred first: the "final user callback" and the "final Network
* callback". The Thread involved with the last of these two events is in charge of the posting -
* this is intrinsically racy.
*/
private final AtomicCombinatoryState mReportState =
new AtomicCombinatoryState(ReportState.REPORT_READY);

private final AtomicBoolean mUploadProviderClosed = new AtomicBoolean(false);

Expand Down Expand Up @@ -139,7 +173,7 @@ public final class CronetUrlRequest extends UrlRequestBase {
/* These change with redirects. */
private final AtomicReference<EnvoyHTTPStream> mStream = new AtomicReference<>();
private final List<String> mUrlChain = new ArrayList<>();
private EnvoyFinalStreamIntel mEnvoyFinalStreamIntel;
private volatile EnvoyFinalStreamIntel mEnvoyFinalStreamIntel;
private long mBytesReceivedFromRedirects = 0;
private long mBytesReceivedFromLastRedirect = 0;
private CronvoyHttpCallbacks mCronvoyCallbacks;
Expand Down Expand Up @@ -273,7 +307,7 @@ public void read(final ByteBuffer buffer) {
}
if (mState.compareAndSet(State.AWAITING_READ, streamEnded() ? State.COMPLETE : State.READING)) {
if (streamEnded()) {
if (mCronvoyCallbacks.successReady(SucceededState.FINAL_READ_DONE)) {
if (mSucceededState.hasReachedFinalState(SucceededState.FINAL_READ_DONE)) {
onSucceeded();
}
return;
Expand Down Expand Up @@ -593,7 +627,7 @@ void onCanceled() {
public void run() {
try {
mCallback.onCanceled(CronetUrlRequest.this, mUrlResponseInfo);
maybeReportMetrics();
maybeReportMetrics(ReportState.USER_FINAL_CALLBACK_DONE);
} catch (Exception exception) {
Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onCanceled method", exception);
}
Expand All @@ -608,7 +642,7 @@ void onSucceeded() {
public void run() {
try {
mCallback.onSucceeded(CronetUrlRequest.this, mUrlResponseInfo);
maybeReportMetrics();
maybeReportMetrics(ReportState.USER_FINAL_CALLBACK_DONE);
} catch (Exception exception) {
Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded method", exception);
}
Expand All @@ -623,7 +657,7 @@ void onFailed() {
public void run() {
try {
mCallback.onFailed(CronetUrlRequest.this, mUrlResponseInfo, mException);
maybeReportMetrics();
maybeReportMetrics(ReportState.USER_FINAL_CALLBACK_DONE);
} catch (Exception exception) {
Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onFailed method", exception);
}
Expand Down Expand Up @@ -666,34 +700,33 @@ private void recordEnvoyFinalStreamIntel(EnvoyFinalStreamIntel envoyFinalStreamI
mUrlResponseInfo.setReceivedByteCount(envoyFinalStreamIntel.getReceivedByteCount() +
mBytesReceivedFromRedirects);
}
maybeReportMetrics(ReportState.NETWORK_FINAL_CALLBACK_RECEIVED);
}

private void recordEnvoyStreamIntel(EnvoyStreamIntel envoyStreamIntel) {
mUrlResponseInfo.setReceivedByteCount(envoyStreamIntel.getConsumedBytesFromResponse() +
mBytesReceivedFromRedirects);
}

// Maybe report metrics. This method should only be called on Callback's executor thread and
// after Callback's onSucceeded, onFailed and onCanceled.
private void maybeReportMetrics() {
if (mEnvoyFinalStreamIntel != null) {
Metrics metrics = getMetrics(mEnvoyFinalStreamIntel, mBytesReceivedFromRedirects);
final RequestFinishedInfo requestInfo =
new RequestFinishedInfoImpl(mInitialUrl, mRequestAnnotations, metrics,
getFinishedReason(), mUrlResponseInfo, mException);
mRequestContext.reportRequestFinished(requestInfo);
if (mRequestFinishedListener != null) {
try {
mRequestFinishedListener.getExecutor().execute(new Runnable() {
@Override
public void run() {
mRequestFinishedListener.onRequestFinished(requestInfo);
}
});
} catch (RejectedExecutionException failException) {
Log.e(CronetUrlRequestContext.LOG_TAG, "Exception posting task to executor",
failException);
}
private void maybeReportMetrics(@ReportState int reportStateEvent) {
if (!mReportState.hasReachedFinalState(reportStateEvent)) {
return;
}
Metrics metrics = getMetrics(mEnvoyFinalStreamIntel, mBytesReceivedFromRedirects);
final RequestFinishedInfo requestInfo =
new RequestFinishedInfoImpl(mInitialUrl, mRequestAnnotations, metrics, getFinishedReason(),
mUrlResponseInfo, mException);
mRequestContext.reportRequestFinished(requestInfo);
if (mRequestFinishedListener != null) {
try {
mRequestFinishedListener.getExecutor().execute(new Runnable() {
@Override
public void run() {
mRequestFinishedListener.onRequestFinished(requestInfo);
}
});
} catch (RejectedExecutionException failException) {
Log.e(CronetUrlRequestContext.LOG_TAG, "Exception posting task to executor", failException);
}
}
}
Expand All @@ -720,15 +753,6 @@ private int getFinishedReason() {
}
}

private static class HeadersList extends ArrayList<Map.Entry<String, String>> {}

private static class DirectExecutor implements Executor {
@Override
public void execute(Runnable runnable) {
runnable.run();
}
}

private static int determineNextState(boolean endStream, @State int original,
@State int desired) {
switch (original) {
Expand All @@ -741,10 +765,18 @@ private static int determineNextState(boolean endStream, @State int original,
}
}

private static class HeadersList extends ArrayList<Map.Entry<String, String>> {}

private static class DirectExecutor implements Executor {
@Override
public void execute(Runnable runnable) {
runnable.run();
}
}

private class CronvoyHttpCallbacks implements EnvoyHTTPCallbacks {

private final AtomicInteger mCancelState = new AtomicInteger(CancelState.READY);
private final AtomicInteger mSucceededState = new AtomicInteger(SucceededState.UNDETERMINED);
private volatile boolean mEndStream = false; // Accessed by different Threads

@Override
Expand Down Expand Up @@ -796,7 +828,10 @@ public void run() {
checkCallingThread();
try {
if (locationField != null) {
mCronvoyCallbacks = null; // Makes CronvoyHttpCallbacks abandoned.
// This CronvoyHttpCallbacks instance is already in an abandoned state at this point:
// mState == State.AWAITING_FOLLOW_REDIRECT. But mState will change soon, so this line
// puts the final nail in the coffin. isAbandoned() can only keep returning true.
mCronvoyCallbacks = null;
mStream.set(null);
mPendingRedirectUrl = URI.create(mCurrentUrl).resolve(locationField).toString();
mWaitingOnRedirect.set(true);
Expand Down Expand Up @@ -872,7 +907,8 @@ public void onTrailers(Map<String, List<String>> trailers, EnvoyStreamIntel stre
return;
}
if (mState.compareAndSet(State.READING, State.COMPLETE)) {
mCronvoyCallbacks.successReady(SucceededState.FINAL_READ_DONE);
// onComplete still needs to be called - this always returns false.
mSucceededState.hasReachedFinalState(SucceededState.FINAL_READ_DONE);
}
}

Expand Down Expand Up @@ -948,7 +984,7 @@ public void onComplete(EnvoyStreamIntel streamIntel, EnvoyFinalStreamIntel final
return;
}
recordEnvoyFinalStreamIntel(finalStreamIntel);
if (successReady(SucceededState.ON_COMPLETE_RECEIVED)) {
if (mSucceededState.hasReachedFinalState(SucceededState.ON_COMPLETE_RECEIVED)) {
onSucceeded();
}
}
Expand Down Expand Up @@ -1037,17 +1073,6 @@ private void setUrlResponseInfo(Map<String, List<String>> responseHeaders, int r
Collections.unmodifiableList(headerList), false, selectedTransport, ":0");
}

private boolean successReady(@SucceededState int activityDone) {
@SucceededState int originalState;
@SucceededState int updatedState;
do {
originalState = mSucceededState.get();
updatedState = originalState | activityDone;
} while (!mSucceededState.compareAndSet(originalState, updatedState));
return originalState != SucceededState.SUCCESS_READY &&
updatedState == SucceededState.SUCCESS_READY;
}

private boolean completeAbandonIfAny(@State int originalState, @State int updatedState) {
if (originalState == State.COMPLETE || originalState == State.CANCELLED ||
originalState == State.ERROR) {
Expand Down
75 changes: 75 additions & 0 deletions test/java/org/chromium/net/impl/AtomicCombinatoryStateTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.chromium.net.impl;

import static org.assertj.core.api.Assertions.assertThat;

import androidx.test.ext.junit.runners.AndroidJUnit4;
import java.util.concurrent.atomic.AtomicInteger;
import org.chromium.net.testing.ConditionVariable;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(AndroidJUnit4.class)
public class AtomicCombinatoryStateTest {

@Test
public void trivialCase_false() {
assertThat(new AtomicCombinatoryState(1).hasReachedFinalState(0)).isFalse();
}

@Test
public void trivialCase_true() {
assertThat(new AtomicCombinatoryState(1).hasReachedFinalState(1)).isTrue();
}

@Test
public void partialState() {
assertThat(new AtomicCombinatoryState(3).hasReachedFinalState(1)).isFalse();
}

@Test
public void finalState() {
AtomicCombinatoryState atomicCombinatoryState = new AtomicCombinatoryState(3);
atomicCombinatoryState.hasReachedFinalState(2);
assertThat(atomicCombinatoryState.hasReachedFinalState(1)).isTrue();
}

@Test
public void finalState_twice() {
AtomicCombinatoryState atomicCombinatoryState = new AtomicCombinatoryState(3);
atomicCombinatoryState.hasReachedFinalState(2);
atomicCombinatoryState.hasReachedFinalState(1);
assertThat(atomicCombinatoryState.hasReachedFinalState(1)).isFalse();
}

@Test
public void finalState_multiThread() throws Exception {
ConditionVariable startBlock = new ConditionVariable();
ConditionVariable allThreadsReady = new ConditionVariable();
AtomicInteger sequence = new AtomicInteger(0);
AtomicCombinatoryState atomicCombinatoryState = new AtomicCombinatoryState(3);
AtomicInteger trueCount = new AtomicInteger(0);
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread() {
@Override
public void run() {
int sequenceId = sequence.incrementAndGet();
if (sequenceId == threads.length) {
allThreadsReady.open();
}
startBlock.block();
if (atomicCombinatoryState.hasReachedFinalState((sequenceId & 1) + 1)) { // 1 and 2 only.
trueCount.incrementAndGet(); // Should be executed only once.
}
}
};
threads[i].start();
}
allThreadsReady.block(); // This unblocks when all 10 Thread are blocking on "startBlock"
startBlock.open(); // Most threads will unblock simultaneously on a "multi-threading" CPU.
for (Thread thread : threads) {
thread.join(); // Wait for each Thread to die.
}
assertThat(trueCount.get()).isEqualTo(1);
}
}
2 changes: 2 additions & 0 deletions test/java/org/chromium/net/impl/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ envoy_package()
envoy_mobile_android_test(
name = "cronvoy_test",
srcs = [
"AtomicCombinatoryStateTest.java",
"CronvoyEngineTest.java",
"UrlRequestCallbackTester.java",
],
Expand All @@ -26,5 +27,6 @@ envoy_mobile_android_test(
"//library/java/org/chromium/net/impl:cronvoy",
"//library/kotlin/io/envoyproxy/envoymobile:envoy_interfaces_lib",
"//library/kotlin/io/envoyproxy/envoymobile:envoy_lib",
"//test/java/org/chromium/net/testing",
],
)
Loading