2016-09-28 52 views
5

en Bazı yineleyici var diyelim:Gelecek [T] yayan kaynak nasıl ele alınır?

val nextElemIter: Iterator[Future[Int]] = Iterator.continually(...) 


Ve o İlerleticiden bir kaynak oluşturmak istiyorum:

val source: Source[Future[Int], NotUsed] = 
    Source.fromIterator(() => nextElemIter) 


Yani şimdi benim kaynak Future s yayar.

val source: Source[Int, NotUsed] = 
    Source.fromIterator(() => nextElemIter).mapAsync(1)(identity /* was n => n */) 


Ve şimdi T yerine Future[T] yayar düzenli bir kaynak vardır: Böyle bir şey vadeli Akka docs aşamalar arasında geçirilen veya başka bir yerde, bunun yerine, hep yapabileceğini görmemiştim. Ama bu hacky ve yanlış hissettiriyor.

Bu gibi durumlarla başa çıkmanın doğru yolu nedir?

+3

Ben mapAsync' 'düşünüyorum

def isGoodInt(i : Int) : Boolean = ??? //filter def transformInt(i : Int) : Int = ??? //map def combineInts(i : Int, j : Int) : Int = ??? //reduce 

Vadeli bu işlevleri kullanarak doğrudan bir yol sağlar burada gayet iyi. Sonuçta, bu amaç için tam olarak amaçlanmıştır - gelecekleri akışlara yaslamak. –

+1

'mapAsync (1) (kimlik)' bunu yapmak için uygun bir yoldur. – expert

+0

@expert düzenlenmiştir. –

cevap

4

Sorunuzu doğrudan yanıtlama: Anlattığınız amaç için mapAsync'u kullanma konusunda "hacky" hiçbir şeyin bulunmadığına dair Vladimir'in yorumuna katılıyorum. Altında yatan Int değerlerini Future açmak için daha doğrudan bir yol düşünemiyorum.

bir eşzamanlılık mekanizması olarak, geri-basınç gerektiğinde inanılmaz derecede faydalıdır, dolaylı soru ... Futures

Akışları sopa ile

deneyin cevaplama. Ancak, salt Future işlemlerinin uygulamalarda da yeri vardır.

Iterator[Future[Int]] ürününüz bilinen, sınırlı sayıda Future değerini üretecekse, daha sonra Vadeli kullanım için Futures'ı kullanarak takılmak isteyebilirsiniz.

Resim filtrelemek, düşünmek, & değerini azaltmak için düşünün Int. Eğer önerildiği gibi Akışı kullanmanın biraz dolaylı yolu ile karşılaştırıldığında

val finalVal : Future[Int] = 
    Future sequence { 
    for { 
     f <- nextElemIter.toSeq //launch all of the Futures 
     i <- f if isGoodInt(i) 
    } yield transformInt(i) 
    } map (_ reduce combineInts) 

:

val finalVal : Future[Int] = 
    Source.fromIterator(() => nextElemIter) 
     .via(Flow[Future[Int]].mapAsync(1)(identity)) 
     .via(Flow[Int].filter(isGoodInt)) 
     .via(Flow[Int].map(transformInt)) 
     .to(Sink.reduce(combineInts)) 
     .run()