2016-03-21 6 views
0

Çoklu Observer'u kullanmayı denedim , bir döngü içinde onNext meydana geldi. Her bir Gözlemci için işe yaramaz. Sadece ikinci bir abone alamaz Neden ilk ObserverRxScala, birden fazla Gözlemci ile abone olmak için sadece bir tane yayınlayın.

first observer - hi~ 
first observer - hi~ 
... 

için

import rx.lang.scala.Observable 

object SubscribeMultiEvent extends App{ 
    val obv = Observable.apply[String]{ s => 
    def printForever: Unit = { 
     s.onNext("hi~") 
     Thread.sleep(1000) 
     printForever 
    } 
    printForever 
    } 

    obv.subscribe(s => println(s"first observer - $s")) 
    obv.subscribe(s => println(s"second observer - $s")) 

    Thread.currentThread().join() 
} 

Tepki?

cevap

1

Kodlarınızdaki kodunuz Observable senkronize. Bu, ikinci subscribe ilk subscribe bitirene kadar çalışmayacağı anlamına gelir. Ve Observable'unuz hiçbir zaman tamamlanmadığından, ikinci subscribe çalışmayacak.

Bu sorunu gidermek için, Observable saatini asenkronize etmeniz gerekir. Başka bir Konuda çalışmak için subscribeOn'u kullanabilirsiniz. Ör sonunda

import rx.lang.scala.Observable 
import rx.lang.scala.schedulers.NewThreadScheduler 

object SubscribeMultiEvent extends App{ 
    val obv = Observable.apply[String]{ s => 
    def printForever: Unit = { 
     s.onNext("hi~") 
     Thread.sleep(1000) 
     printForever 
    } 
    printForever 
    }.subscribeOn(NewThreadScheduler()) 

    obv.subscribe(s => println(s"first observer - $s")) 
    obv.subscribe(s => println(s"second observer - $s")) 

    Thread.sleep(60000) 
} 

Thread.sleep(60000) önemlidir. RxJava'nın Konuları varsayılan olarak daemon'dur ve ana iş parçacığı bittiğinde, daha fazla daemon olmayan iş parçacığı olmadığı için, JVM çıkacaktır. Ana iş parçacığının durmasını önlemek için Thread.sleep(60000) gibi bir şey eklemeniz gerekir.

+0

Teşekkürler, bu harika. Dahası, bu şekilde bir döngü yayma olayı oluşturabilirim, ancak her zaman tüm gözlemcilerde vücuda abone olmasını isterim? Henüz her abone için yeni iş parçacığı kullanmak istemiyorum. – LoranceChen

+0

'Yayınla 'ihtiyacın var gibi görünüyor. Bu örneğe bakın: https://github.com/ReactiveX/RxScala/blob/a385e1a474a05af5173d3a6c5f380b0f87b50dff/examples/src/test/scala/examples/RxScalaDemo.scala#L438 – zsxwing

+0

'yayınla 'inanılmaz, Rx'i daha derinden öğrenmem gerekiyor. – LoranceChen