2015-03-04 11 views
21

İşte, neyi başarmaya çalıştığımı gösteren bir resim.Split Rx Birden çok akışa gözlenebilir ve tek tek işlenebilir

--abca - bbb - bir

bölünmüş

içine -a ----- bir ------- a -> bir akış

- --- b ------ bbb --- -> b akışı

------ c ---------- -> O; c

a.subscribe() 
b.subscribe() 
c.subscribe() 
ürününü bulun.

Şimdiye kadar, bulduğum her şey akışı bir groupBy() kullanarak ayırdı, ancak daha sonra her şeyi tek bir akışa yığdı ve hepsini aynı işlevde işledi. Ne yapmak istediğim, her türetilmiş akışı farklı bir şekilde işlemektir.

Yaptığım şey şu anda bir sürü filtre yapıyor. Bunu yapmanın daha iyi bir yolu var mı?

cevap

7

groupBy adresinden Observables çökmesine gerek yoktur. Bunun yerine onlara abone olabilirsiniz. Böyle

şey: ifadeleri tür çirkin ama en azından ayrı ayrı akışı işleyebilir bakarsak

String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"}; 

    Action1<String> a = s -> System.out.print("-a-"); 

    Action1<String> b = s -> System.out.print("-b-"); 

    Action1<String> c = s -> System.out.print("-c-"); 

    Observable 
      .from(inputs) 
      .groupBy(s -> s) 
      .subscribe((g) -> { 
       if ("a".equals(g.getKey())) { 
        g.subscribe(a); 
       } 

       if ("b".equals(g.getKey())) { 
        g.subscribe(b); 
       } 

       if ("c".equals(g.getKey())) { 
        g.subscribe(c); 
       } 
      }); 

. Belki onlardan kaçınmanın bir yolu vardır.

+0

Evet, muhtemelen bunlardan sakınmak isterim. Ancak, eğer çalışıyorsa, orijinal akışında filtreler yapmak yerine, tek bir yerde her şeyden biraz daha temiz görünecektir. Teşekkürler! –

+0

Bir çekicilik gibi çalıştım! –

+0

Harika! Eğer if ifadelerinden nasıl kurtulacağımı bulursam cevabımı güncelleyeceğim. – ihuk

31

Kolay pasta gibi, sadece sadece gözlemlenebilir kaynak sıcak olduğundan emin olmak gerekir

import rx.lang.scala.Observable 

val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a") 
val hotO: Observable[String] = o.share 
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a") 
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b") 
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c") 

aSource.subscribe(o ⇒ println("A: " + o), println,() ⇒ println("A Completed")) 

bSource.subscribe(o ⇒ println("B: " + o), println,() ⇒ println("B Completed")) 

cSource.subscribe(o ⇒ println("C: " + o), println,() ⇒ println("C Completed")) 

scala

yılında filter

bir örnek kullanın. En kolay yol share için.

+2

Ya soğuk algınlığı mı yoksa ilk gözlenebilir mi? –

+7

@double_squeeze, tüm paylaşımlar abone olduğunda "paylaş" yerine "yayınla" yı kullanın ve 'bağlan'ı çağırın. –

+0

@Krzysztof Skyrzynecki – CrazyBS