最新公告
  • 欢迎您光临AA分享网,这里是高级程序员技术学习、分享的分享平台立即加入我们
  • RxJava:操作符和Subject的线程安全

    原文:RxJava: thread safety of the Operators and Subjects 

    绝大多数RxJavaOperators 和 Subjects都不是线程安全的。

    RxJava很棒,除了并发。这个话题我很早就想讨论了。

    Observable 协议

    Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

    看到了吗?如果Observable同时从不同的线程发射数据是违背协议的!

    让我们来看看一个非常简单的 operator take(n).

    如果你打开操作符Take 的源码你会发现没有同步与线程安全的相关代码。

    // …
    int count;  
    boolean completed;
    @Override
    public void onCompleted() {  
        if (!completed) {
            completed = true;
            child.onCompleted();
        }
    }
    // …

    等等,难道RxJava不是异步响应式编程的一个解决方案吗?

    是的,它是一个解决办法而且也非常棒。但是异步并不意味着并发。

    大多数操作符和Subject不是线程安全的原因是性能,如果每个操作符,甚至于take(n)这种平常的操作符都是线程安全的话我们需要付出很大的代价。

    David Karnok (RxJava的领导开发者)还列出了其他几个多数操作符都非线程安全的原因:

    1. 因为那样的话实现操作符的复杂性会显著提高(看看merge())。

    2. 会更难以维护操作符。

    3. 会更难以解释操作符以及相关的东西。

    等等,你是在说你能通过multithreading emission打断take(n)?

    是的,很抱歉,我确实可以😅。

    @Test
    fun `break take(n)`() {  
        val numberOfThreads = 10
        repeat(100000) {
            println("Iteration = $it")
            val publishSubject = PublishSubject.create<Int>() // Fix: .toSerialized().
            val actuallyReceived = AtomicInteger()
            publishSubject.take(3).subscribe { actuallyReceived.incrementAndGet() }
            val latch = CountDownLatch(numberOfThreads)
            var threads = listOf<Thread>()
            (0..numberOfThreads).forEach {
                threads += thread(start = false) {
                    publishSubject.onNext(it)
                    latch.countDown()
                }
            }
            threads.forEach { it.start() }
            latch.await()
            assertThat(actuallyReceived.get()).isEqualTo(3)
        }
    }

    (actually this code ^ crashes Kotlin compiler 😅, but works if you rewrite it in Java)

    什么操作符不是线程安全的?

    基本上,所有操作一个Observable的 操作符:take(n), map(), distinctUntilChanged()等等。

    除了带scheduler的操作符,比如:window(…, scheduler), debounce(…, scheduler),等等。

    别在这里找一个完整的列表,只需尝试去理解什么样的操作符是线程安全的,什么不是。

    什么操作符是线程安全的?

    通常来说,所有操作多个Observable的操作符都是线程安全的:merge(), combineLatest(), zip()等等。

    它们对下游数据流做序列化,让非线程安全的下游数据流操作符能正常工作。

    还是那句话,别在这里要完整的列表。理解原理!

    So, the pattern for Operators is…?

    大体是这样:

    fun operatorThreadSafety() = if (operator.worksWithOneObservable() &&  
      operator.supportsScheduling == false) {
      Operator.NOT_THREAD_SAFE_AND_THAT_IS_OK
    } else {
      Operator.MOST_LIKELY_THREAD_SAFE
    }

    那么Subject呢?

    这就是问题的所在…所有的Subject都不是线程安全的,除了SerializedSubject。

    是的,你喜欢的PublishSubject 和 BehaviorSubject都不是线程安全的。

    这其实有点危险!因为subject通常在不同的代码块中被共享,它们可能并行运行在不同的线程中。

    这点我是吸取过教训的,我们有一个subject以及来自网络请求的多个数据流,它最终打断了下游数据流distinctUntilChanged() 和我们的业务逻辑。

    去, 既然同步的Subject是危险的,那么我该如何做呢?

    序列化之! 模式:

    fun threadSafeSubject(subject: Subject) = if (you.writeToItFromMultipleThreads()) {  
      subject.toSerialized() // Serialize it! Now!
    } else {
      subject // You're fine, use it as is.
    }

    如果你把Subject作为一个 event bus使用这尤其危险。比如我们在 StorIO中所做的那样。因为数据库中的改变发生在不同的线程,有并发的可能,我们把subject序列化,这样就保证了用户的安全。

    我该对我自定义的同步发射数据的Observable做些什么?

    首先,不要使用Observable.create():见 RxJava#PR#4253。

    其次,你需要序列化Observable发射的数据,最简单的方式就是调用serialize() 。

    What should I do in general with concurrency and RxJava?

    只要不违背 The Observable Contract,并且如果你并行发射数据请serialize() Observable。

    AA分享网一个高级程序员的学习、分享的IT资源分享平台
    AA分享网-企业网站源码-PHP源码-网站模板-视频教程-IT技术教程 » RxJava:操作符和Subject的线程安全

    常见问题FAQ

    免费下载或者VIP会员专享资源能否直接商用?
    本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。更多说明请参考 VIP介绍。
    提示下载完但解压或打开不了?
    最常见的情况是下载不完整: 可对比下载完压缩包的与网盘上的容量,若小于网盘提示的容量则是这个原因。这是浏览器下载的bug,建议用百度网盘软件或迅雷下载。若排除这种情况,可在对应资源底部留言,或 联络我们.。
    找不到素材资源介绍文章里的示例图片?
    对于PPT,KEY,Mockups,APP,网页模版等类型的素材,文章内用于介绍的图片通常并不包含在对应可供下载素材包内。这些相关商业图片需另外购买,且本站不负责(也没有办法)找到出处。 同样地一些字体文件也是这种情况,但部分素材会在素材包内有一份字体下载链接清单。
    站壳网
    一个高级程序员模板开发平台

    发表评论

    • 74会员总数(位)
    • 2792资源总数(个)
    • 56本周发布(个)
    • 0 今日发布(个)
    • 237稳定运行(天)

    提供最优质的资源集合

    立即查看 了解详情