2016-10-07 55 views
6

Genel olarak Akka akışları ve akışları için yeniyim, bu yüzden kavramsal bir düzeyde bir şeyi yanlış anlamış olabilirim, ancak gelecekteki bir çözüme dönüşene kadar geri tepmeyi yaratabileceğim herhangi bir yol var mı? Esasen ne yapmak istediğim şu şekildedir: Bir Akka akışının içinde bir Geleceğe ait geri tepme yaratın

object Parser { 
    def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ??? 
} 

val futures = FileIO.fromPath(path) 
    .map(st => Parser.parseBytesToSeq(st.toByteBuffer)) 
    .batch(1000, x => x)(_ ++ _) 
    .map(values => doAsyncOp(values)) 
    .runWith(Sink.seq) 

def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ??? 

Bayt

bir dosyadan okuma ve ExampleObject s Seq s yayar ayrıştırıcı, için akışlarının ve bu bir Future döndüren bir zaman uyumsuz işlem akıyor edilir. Future çözülene kadar, akımın geri kalanı geri tepme durumuna gelinceye kadar devam etmek istiyorum, sonra Gelecek çözümlendiğinde bir başka Seq[ExampleObject] doAsyncOp geçirerek geri tepme devam eder ve böylece devam eder.

Şu anda bu çalışmayı var:

Await.result(doAsyncOp(values), 10 seconds) 

Ama benim anlayış bu dizisinin tamamını kilitleniyor ve kötü olmasıdır. Etrafında daha iyi bir yolu var mı?

Eğer bu yardımcı oluyorsa, büyük resim, Jawn ile çok büyük bir JSON dosyasını (belleğe sığmayacak kadar büyük) ayrıştırmaya çalışıyorum, daha sonra bu nesneler, ElasticSearch'e indekslenecek şekilde geçmelidir. ayrıştırıldı - ElasticSearch, bekleyen 50 işlem kuyruğuna sahip, eğer taşması durumunda yeni nesneleri reddetmeye başlar.

cevap

9

Oldukça kolay. Sen 4 paralellik seviyesi olduğu mapAync :)

val futures = FileIO.fromPath(path) 
    .map(st => Parser.parseBytesToSeq(st.toByteBuffer)) 
    .batch(1000, x => x)(_ ++ _) 
    .mapAsync(4)(values => doAsyncOp(values)) 
    .runWith(Sink.seq) 

kullanmak gerekir.