Skip to content

Commit

Permalink
Updated rxJava to version 2.
Browse files Browse the repository at this point in the history
First attempt to fix #1279
  • Loading branch information
nickb24 committed Nov 24, 2020
1 parent c38139c commit 1a6d6a3
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 158 deletions.
5 changes: 3 additions & 2 deletions app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ android {
minSdkVersion 18
// increasing target SDK version can cause compatibility issues with Android 7+
//noinspection ExpiredTargetSdkVersion
targetSdkVersion 23
targetSdkVersion 26
// change versionCode only when downgrade should be prevented
// eg, when data structures are incompatible
versionCode 1603091400
Expand Down Expand Up @@ -258,7 +258,8 @@ dependencies {
implementation 'com.squareup.retrofit:converter-gson:2.0.0-beta2'
implementation 'com.getpebble:pebblekit:3.1.0'
implementation 'com.github.jamorham:amazfitcommunication:master-SNAPSHOT'
implementation 'io.reactivex:rxjava:1.3.3'
//implementation 'io.reactivex:rxjava:1.3.3'
implementation "io.reactivex.rxjava2:rxjava:2.2.20"
implementation 'com.activeandroid:thread-safe-active-android:3.1.1'
//implementation 'com.github.lecho:hellocharts-android:v1.5.8'
implementation "com.polidea.rxandroidble2:rxandroidble:1.11.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,33 +140,30 @@ public static boolean doCheckAuth(Ob1G5CollectionService parent, RxBleConnection
final AuthRequestTxMessage authRequest = new AuthRequestTxMessage(getTokenSize(), usingAlt());
UserError.Log.i(TAG, "AuthRequestTX: " + JoH.bytesToHex(authRequest.byteSequence));

connection.setupNotification(Authentication)
connection.setupIndication(Authentication)
// .timeout(10, TimeUnit.SECONDS)
.timeout(15, TimeUnit.SECONDS) // WARN
// .observeOn(Schedulers.newThread()) // needed?
.doOnNext(notificationObservable -> {
connection.writeCharacteristic(Authentication, nn(authRequest.byteSequence))
connection.writeCharacteristic(Authentication, nn(authRequest.byteSequence))
.subscribe(
characteristicValue -> {
// Characteristic value confirmed.
if (d)
UserError.Log.d(TAG, "Wrote authrequest, got: " + JoH.bytesToHex(characteristicValue));
speakSlowly();
connection.readCharacteristic(Authentication).subscribe(
readValue -> {
authenticationProcessor(parent, connection, readValue);
}, throwable -> {
UserError.Log.e(TAG, "Could not read after AuthRequestTX: " + throwable);
});
//parent.background_automata();

UserError.Log.i(TAG, "SUCCESS: read after AuthRequestTX: " + JoH.bytesToHex(characteristicValue));
authenticationProcessor(parent, connection, characteristicValue);

},
throwable -> {
UserError.Log.e(TAG, "Could not write AuthRequestTX: " + throwable);
parent.incrementErrors();
}

);
}).flatMap(notificationObservable -> notificationObservable)
})
.flatMap(notificationObservable -> notificationObservable)
//.timeout(5, TimeUnit.SECONDS)
//.observeOn(Schedulers.newThread())
.subscribe(bytes -> {
Expand Down Expand Up @@ -229,24 +226,8 @@ private static void authenticationProcessor(final Ob1G5CollectionService parent,
connection.writeCharacteristic(Authentication, nn(new BaseAuthChallengeTxMessage(challengeHash).byteSequence))
.subscribe(
challenge_value -> {
authenticationProcessor(parent, connection, challenge_value);

speakSlowly();

connection.readCharacteristic(Authentication)
//.observeOn(Schedulers.io())
.subscribe(
status_value -> {
// interpret authentication response
authenticationProcessor(parent, connection, status_value);
}, throwable -> {
if (throwable instanceof OperationSuccess) {
UserError.Log.d(TAG, "Stopping auth challenge listener due to success");
} else {
UserError.Log.e(TAG, "Could not read reply to auth challenge: " + throwable);
parent.incrementErrors();
speakSlowly = true;
}
});
}, throwable -> {
UserError.Log.e(TAG, "Could not write auth challenge reply: " + throwable);
parent.incrementErrors();
Expand Down Expand Up @@ -301,14 +282,14 @@ private static void authenticationProcessor(final Ob1G5CollectionService parent,
break;

default:
UserError.Log.e(TAG, "Unhandled packet type in reply: " + pkt.type + " " + JoH.bytesToHex(readValue));
parent.incrementErrors();
UserError.Log.d(TAG, "Unhandled packet type in reply: " + pkt.type + " " + JoH.bytesToHex(readValue));
//parent.incrementErrors();
// TODO what to do here?
break;
}
}

private static final int SPEAK_SLOWLY_DELAY = 300;
private static final int SPEAK_SLOWLY_DELAY = 50;

private static int speakSlowlyDelay() {
return speakSlowly ? SPEAK_SLOWLY_DELAY : 0;
Expand Down Expand Up @@ -372,20 +353,12 @@ public synchronized static boolean doKeepAliveAndBondRequest(Ob1G5CollectionServ
.subscribe(
bondRequestValue -> {
UserError.Log.d(TAG, "Wrote bond request value: " + JoH.bytesToHex(bondRequestValue));
speakSlowly();
connection.readCharacteristic(Authentication)
.observeOn(Schedulers.io())
.timeout(10, TimeUnit.SECONDS)
.subscribe(
status_value -> {
UserError.Log.d(TAG, "Got status read after keepalive " + JoH.bytesToHex(status_value));
authenticationProcessor(parent, connection, status_value);
throw new OperationSuccess("Bond requested");
}, throwable -> {
UserError.Log.e(TAG, "Throwable when reading characteristic after keepalive: " + throwable);
});

// Wrote bond request successfully was here moved above - is this right?

UserError.Log.d(TAG, "Got status read after keepalive " + JoH.bytesToHex(bondRequestValue));
authenticationProcessor(parent, connection, bondRequestValue);
throw new OperationSuccess("Bond requested");

// Wrote bond request successfully was here moved above - is this right?
}, throwable -> {
// failed to write bond request retry?
if (!(throwable instanceof OperationSuccess)) {
Expand Down
Loading

0 comments on commit 1a6d6a3

Please sign in to comment.