原文:RxJava: thread safety of the Operators and Subjects 

绝大多数RxJavaOperators 和 Subjects都不是线程安全的。

RxJava很棒,除了并发。这个话题我很早就想讨论了。

Observable 协议

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

看到了吗?如果Observable同时从不同的线程发射数据是违背协议的!

让我们来看看一个非常简单的 operator take(n).

如果你打开操作符Take 的源码你会发现没有同步与线程安全的相关代码。

// …
int count;  
boolean completed;
@Override
public void onCompleted() {  
    if (!completed) {
        completed = true;
        child.onCompleted();
    }
}
// …

等等,难道RxJava不是异步响应式编程的一个解决方案吗?

是的,它是一个解决办法而且也非常棒。但是异步并不意味着并发。

大多数操作符和Subject不是线程安全的原因是性能,如果每个操作符,甚至于take(n)这种平常的操作符都是线程安全的话我们需要付出很大的代价。

David Karnok (RxJava的领导开发者)还列出了其他几个多数操作符都非线程安全的原因:

  1. 因为那样的话实现操作符的复杂性会显著提高(看看merge())。

  2. 会更难以维护操作符。

  3. 会更难以解释操作符以及相关的东西。

等等,你是在说你能通过multithreading emission打断take(n)?

是的,很抱歉,我确实可以😅。

@Test
fun `break take(n)`() {  
    val numberOfThreads = 10
    repeat(100000) {
        println("Iteration = $it")
        val publishSubject = PublishSubject.create<Int>() // Fix: .toSerialized().
        val actuallyReceived = AtomicInteger()
        publishSubject.take(3).subscribe { actuallyReceived.incrementAndGet() }
        val latch = CountDownLatch(numberOfThreads)
        var threads = listOf<Thread>()
        (0..numberOfThreads).forEach {
            threads += thread(start = false) {
                publishSubject.onNext(it)
                latch.countDown()
            }
        }
        threads.forEach { it.start() }
        latch.await()
        assertThat(actuallyReceived.get()).isEqualTo(3)
    }
}

(actually this code ^ crashes Kotlin compiler 😅, but works if you rewrite it in Java)

什么操作符不是线程安全的?

基本上,所有操作一个Observable的 操作符:take(n), map(), distinctUntilChanged()等等。

除了带scheduler的操作符,比如:window(…, scheduler), debounce(…, scheduler),等等。

别在这里找一个完整的列表,只需尝试去理解什么样的操作符是线程安全的,什么不是。

什么操作符是线程安全的?

通常来说,所有操作多个Observable的操作符都是线程安全的:merge(), combineLatest(), zip()等等。

它们对下游数据流做序列化,让非线程安全的下游数据流操作符能正常工作。

还是那句话,别在这里要完整的列表。理解原理!

So, the pattern for Operators is…?

大体是这样:

fun operatorThreadSafety() = if (operator.worksWithOneObservable() &&  
  operator.supportsScheduling == false) {
  Operator.NOT_THREAD_SAFE_AND_THAT_IS_OK
} else {
  Operator.MOST_LIKELY_THREAD_SAFE
}

那么Subject呢?

这就是问题的所在…所有的Subject都不是线程安全的,除了SerializedSubject。

是的,你喜欢的PublishSubject 和 BehaviorSubject都不是线程安全的。

这其实有点危险!因为subject通常在不同的代码块中被共享,它们可能并行运行在不同的线程中。

这点我是吸取过教训的,我们有一个subject以及来自网络请求的多个数据流,它最终打断了下游数据流distinctUntilChanged() 和我们的业务逻辑。

去, 既然同步的Subject是危险的,那么我该如何做呢?

序列化之! 模式:

fun threadSafeSubject(subject: Subject) = if (you.writeToItFromMultipleThreads()) {  
  subject.toSerialized() // Serialize it! Now!
} else {
  subject // You're fine, use it as is.
}

如果你把Subject作为一个 event bus使用这尤其危险。比如我们在 StorIO中所做的那样。因为数据库中的改变发生在不同的线程,有并发的可能,我们把subject序列化,这样就保证了用户的安全。

我该对我自定义的同步发射数据的Observable做些什么?

首先,不要使用Observable.create():见 RxJava#PR#4253。

其次,你需要序列化Observable发射的数据,最简单的方式就是调用serialize() 。

What should I do in general with concurrency and RxJava?

只要不违背 The Observable Contract,并且如果你并行发射数据请serialize() Observable。