2016-04-11 15 views
0

Yalnızca yeni değer bir öncekinden farklı olduğunda, göze çarpan sıcak gözlemlenebilir değerlerin (bundan önce -1 ile başlayan) bir gözlemlenebilir oluşturmak istiyorum. . Ayrıca en yeni değerin hemen yeni abonelere yayılmasını istiyorum. Aşağıdaki kod ile geldim:"Yeniden yürütme" ve "autoConnect" kullanıldığında istisna daha fazla istisna "

PublishSubject<Integer> hotObservable = PublishSubject.create(); 

Observable<Integer> observable = hotObservable 
     .startWith(-1) 
     .distinctUntilChanged() 
     .replay(1) 
     .autoConnect(0); 

Ancak bu başarısız (bakılmaksızın observable abone önce yayar neyi hotObservable her zaman -1) Bilmiyorum zaman İlginçtir java.lang.IllegalStateException: more produced than requested ile yeni aboneye yayılan ilk değerden sonra otomatik olarak bağlanır, ancak abone olunan aboneler düzgün çalışır, son değeri alır ve sonra güncelleştirir:

.

Çalışmak için replay(1).autoConnect(0) alamıyorum ve bir şeyi özlüyorum gibi hissediyorum; neden abone olmak ve abonelikten çıkmak isteseydim, autoConnect(0) ise? Böyle gözlemlenebilir oluşturmanın uygun yolu nedir?

Observable<Integer> observable = hotObservable 
     .startWith(-1) 
     .distinctUntilChanged() 
     .replay(1) 
     .autoConnect(); // With (0) it fails 

observable.subscribe().unsubscribe(); // Needed if we don't auto connnect 

hotObservable.onNext(1); 
hotObservable.onNext(2); 
hotObservable.onNext(3); // I want this value to be received by new subscriber 

TestSubscriber<Integer> subscriber = TestSubscriber.create(); 
observable.subscribe(subscriber); 

subscriber.assertNoErrors(); 
subscriber.assertValues(3); 

cevap

1

Ben RxJava 1.1.3 yukarıdaki kod ile More produced than requested hata alamadım:

Burada autoConnect(); observable.subscribe().unsubscribe() kullanmadığınız sürece başarısız test yöntemi bu. onaylama işlemi başarısız

nedeni onun Aboneler herhangi aslında talep dek replay memba bir şey istemeyeceğiz olmasıdır. TestSubscriber abone ilk ise, yayacak şekilde startWith tetikleyebilir -1 ve sonra başka bir şey almazlar dolayısıyla herhangi bir değeri korumaz PublishSubject geçer.

BehaviorSubject<Integer> hotObservable = BehaviorSubject.create(-1); 

Observable<Integer> observable = hotObservable.distinctUntilChanged(); 

hotObservable.onNext(1); 
hotObservable.onNext(2); 
hotObservable.onNext(3); 

TestSubscriber<Integer> subscriber = TestSubscriber.create(); 
observable.subscribe(subscriber); 

subscriber.assertNoErrors(); 
subscriber.assertValue(3); 
+0

1.1.3' çalışır 'versiyonunu teyit edebilir:

seni çok geçen değeri tutar ve yeni aboneler için o ile başlar BehaviorSubject olduğu aradığınızı inanıyoruz! Daha önce 1.1.2'deydim. BehaviorSubject' için - bu ilginç, ancak 'hotObservable' aslında başka bir yerde oluşturulur. BehaviorSubject b = BehaviorSubject.create (-1); hotObservable.subscribe (b) b.distinctUntilChanged() 'geçerli bir yaklaşım mı? – wasyl

+0

Evet, geçerli bir yaklaşımdır. HotObservable sinyallerinin çok hızlı olması durumunda 'onBackpressureBuffer' veya' onBackpressureDrop' eklemek isteyebilirsiniz. – akarnokd

+0

'onBackpressureLatest()' öğesini ekledim. Yardımın için çok teşekkürler! – wasyl