原文:RxJava: thread safety of the Operators and Subjects
绝大多数RxJavaOperators 和 Subjects都不是线程安全的。
RxJava很棒,除了并发。这个话题我很早就想讨论了。
Observable 协议
Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.
看到了吗?如果Observable同时从不同的线程发射数据是违背协议的!
让我们来看看一个非常简单的 operator take(n).
如果你打开操作符Take 的源码你会发现没有同步与线程安全的相关代码。
// … int count; boolean completed; @Override public void onCompleted() { if (!completed) { completed = true; child.onCompleted(); } } // …
等等,难道RxJava不是异步响应式编程的一个解决方案吗?
是的,它是一个解决办法而且也非常棒。但是异步并不意味着并发。
大多数操作符和Subject不是线程安全的原因是性能,如果每个操作符,甚至于take(n)这种平常的操作符都是线程安全的话我们需要付出很大的代价。
David Karnok (RxJava的领导开发者)还列出了其他几个多数操作符都非线程安全的原因:
-
因为那样的话实现操作符的复杂性会显著提高(看看merge())。
-
会更难以维护操作符。
-
会更难以解释操作符以及相关的东西。
等等,你是在说你能通过multithreading emission打断take(n)?
是的,很抱歉,我确实可以😅。
@Test fun `break take(n)`() { val numberOfThreads = 10 repeat(100000) { println("Iteration = $it") val publishSubject = PublishSubject.create<Int>() // Fix: .toSerialized(). val actuallyReceived = AtomicInteger() publishSubject.take(3).subscribe { actuallyReceived.incrementAndGet() } val latch = CountDownLatch(numberOfThreads) var threads = listOf<Thread>() (0..numberOfThreads).forEach { threads += thread(start = false) { publishSubject.onNext(it) latch.countDown() } } threads.forEach { it.start() } latch.await() assertThat(actuallyReceived.get()).isEqualTo(3) } }
(actually this code ^ crashes Kotlin compiler 😅, but works if you rewrite it in Java)
什么操作符不是线程安全的?
基本上,所有操作一个Observable的 操作符:take(n), map(), distinctUntilChanged()等等。
除了带scheduler的操作符,比如:window(…, scheduler), debounce(…, scheduler),等等。
别在这里找一个完整的列表,只需尝试去理解什么样的操作符是线程安全的,什么不是。
什么操作符是线程安全的?
通常来说,所有操作多个Observable的操作符都是线程安全的:merge(), combineLatest(), zip()等等。
它们对下游数据流做序列化,让非线程安全的下游数据流操作符能正常工作。
还是那句话,别在这里要完整的列表。理解原理!
So, the pattern for Operators is…?
大体是这样:
fun operatorThreadSafety() = if (operator.worksWithOneObservable() && operator.supportsScheduling == false) { Operator.NOT_THREAD_SAFE_AND_THAT_IS_OK } else { Operator.MOST_LIKELY_THREAD_SAFE }
那么Subject呢?
这就是问题的所在…所有的Subject都不是线程安全的,除了SerializedSubject。
是的,你喜欢的PublishSubject 和 BehaviorSubject都不是线程安全的。
这其实有点危险!因为subject通常在不同的代码块中被共享,它们可能并行运行在不同的线程中。
这点我是吸取过教训的,我们有一个subject以及来自网络请求的多个数据流,它最终打断了下游数据流distinctUntilChanged() 和我们的业务逻辑。
去, 既然同步的Subject是危险的,那么我该如何做呢?
序列化之! 模式:
fun threadSafeSubject(subject: Subject) = if (you.writeToItFromMultipleThreads()) { subject.toSerialized() // Serialize it! Now! } else { subject // You're fine, use it as is. }
如果你把Subject作为一个 event bus使用这尤其危险。比如我们在 StorIO中所做的那样。因为数据库中的改变发生在不同的线程,有并发的可能,我们把subject序列化,这样就保证了用户的安全。
我该对我自定义的同步发射数据的Observable做些什么?
首先,不要使用Observable.create():见 RxJava#PR#4253。
其次,你需要序列化Observable发射的数据,最简单的方式就是调用serialize() 。
What should I do in general with concurrency and RxJava?
只要不违背 The Observable Contract,并且如果你并行发射数据请serialize() Observable。