Skip to content

Commit

Permalink
fix concurrency bug in ScheduledObserver
Browse files Browse the repository at this point in the history
- found a concurrency bug while working on Netflix/Hystrix#123
- the following code would lock up occasionally due to onCompleted not being delivered:

```java
public class RunTest {
    public static void main(String[] args) {
        System.out.println("Starting test...");

        final ArrayList<String> strings = new ArrayList<String>(200000);

        int num = 10000;
        while (true) {
            long start = System.currentTimeMillis();
            final AtomicInteger count = new AtomicInteger();
            for (int i = 0; i < num; i++) {
                new TestService1(2, 5).toObservable().forEach(new Action1<Integer>() {

                    @OverRide
                    public void call(Integer v) {
                        count.addAndGet(v);
                    }
                });

                new TestService2("hello").toObservable().forEach(new Action1<String>() {

                    @OverRide
                    public void call(String v) {
                        strings.add(v);
                    }

                });
            }
            long time = (System.currentTimeMillis() - start);
            long executions = num * 2;
            System.out.println("Time: " + time + "ms for " + executions + " executions (" + (time * 1000) / executions + " microseconds)");
            System.out.println("   Count: " + count);
            System.out.println("   Strings: " + strings.size());
            strings.clear();
        }
    }
}
```

- Also made OperationObserveOn not use ScheduledObserver if the `ImmediateScheduler` is chosen to allow an optimization. I believe this optimization is safe because ScheduledObserver does not require knowledge of a Scheduler (such as for now()) and all we do is emit data to the Observer on a scheduler and if we know it's Immediate we can go direct and skip the enqueuing step. This allows shaving off a noticable number of microseconds per execution in the loop above.
  • Loading branch information
benjchristensen committed May 10, 2013
1 parent 18b1362 commit 1fa6ae3
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.ImmediateScheduler;
import rx.concurrency.Schedulers;
import rx.util.functions.Func1;

Expand All @@ -50,7 +51,12 @@ public ObserveOn(Observable<T> source, Scheduler scheduler) {

@Override
public Subscription call(final Observer<T> observer) {
return source.subscribe(new ScheduledObserver<T>(observer, scheduler));
if (scheduler instanceof ImmediateScheduler) {
// do nothing if we request ImmediateScheduler so we don't invoke overhead
return source.subscribe(observer);
} else {
return source.subscribe(new ScheduledObserver<T>(observer, scheduler));
}
}
}

Expand Down
74 changes: 30 additions & 44 deletions rxjava-core/src/main/java/rx/operators/ScheduledObserver.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2013 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -18,18 +18,13 @@
import rx.Notification;
import rx.Observer;
import rx.Scheduler;
import rx.concurrency.Schedulers;
import rx.util.functions.Action0;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

/* package */class ScheduledObserver<T> implements Observer<T> {
private final Observer<T> underlying;
private final Scheduler scheduler;

private final ConcurrentLinkedQueue<Notification<T>> queue = new ConcurrentLinkedQueue<Notification<T>>();
private final AtomicInteger counter = new AtomicInteger(0);

public ScheduledObserver(Observer<T> underlying, Scheduler scheduler) {
this.underlying = underlying;
this.scheduler = scheduler;
Expand All @@ -46,47 +41,38 @@ public void onError(final Exception e) {
}

@Override
public void onNext(final T args) {
enqueue(new Notification<T>(args));
public void onNext(final T v) {
enqueue(new Notification<T>(v));
}

private void enqueue(Notification<T> notification) {
int count = counter.getAndIncrement();

queue.offer(notification);
private void enqueue(final Notification<T> notification) {

if (count == 0) {
processQueue();
}
}

private void processQueue() {
scheduler.schedule(new Action0() {
Schedulers.currentThread().schedule(new Action0() {
@Override
public void call() {
Notification<T> not = queue.poll();

switch (not.getKind()) {
case OnNext:
underlying.onNext(not.getValue());
break;
case OnError:
underlying.onError(not.getException());
break;
case OnCompleted:
underlying.onCompleted();
break;
default:
throw new IllegalStateException("Unknown kind of notification " + not);

}

int count = counter.decrementAndGet();
if (count > 0) {
scheduler.schedule(this);
}

scheduler.schedule(new Action0() {
@Override
public void call() {
switch (notification.getKind()) {
case OnNext:
underlying.onNext(notification.getValue());
break;
case OnError:
underlying.onError(notification.getException());
break;
case OnCompleted:
underlying.onCompleted();
break;
default:
throw new IllegalStateException("Unknown kind of notification " + notification);

}
}
});
}

});
}
};

}

0 comments on commit 1fa6ae3

Please sign in to comment.