2016-04-05 9 views
1

bir ağırlık fonksiyonuGrup akış elemanları

val weightFunction: Element => Int 
göz önüne alındığında, bir

val myFlow: Flow[Element] = ... //some flow.. 

hayal Her elemanı bir

val transformedFlow: Flow[List[Element]] 

gibi elde etmek istiyorum

transformedFlow bir Listedir [Öğe], öyle ki o listedeki elemanların ağırlıklarının toplamı verilen bir kontrolden daha büyüktür. stant.

Bunu nasıl başardım?

cevap

1

scan, biriken ağırlıklar akışı oluşturmak için zip öğelerin orijinal akışıyla sonuçları ve daha sonra splitAfter alt tabaka oluşturmak için nasıl kullanılır? Hatta aşağıdaki derlemeye denemedim ama anladınız umarım: (. Sen resultFlow üzerinde map(_.reverse) yapıyor düşünebilirsiniz)

val broadCast = builder.add(Broadcast[Element](2)) 
val zip = builder.add(Zip[Element, Boolean]) 

myFlow.shape.out ~> broadCast.in 

broadCast.out(0) ~> zip.in0 

broadCast.out(1).scan(0){ (totalWeight, elem) => 
    if(totalWeight > Limit) weightFunction(elem) 
    else totalWeight + weightFunction(elem) 
}.map(_ > Limit) ~> zip.in1 

val resultFlow = 
    zip.out.splitAfter(_._2) 
    .fold(List.empty[Element]){ case (list, (elem, _)) => elem :: list } 
    .concatSubstreams 

Edit: sen bile gerek yok scan bir dönüş türünü değiştirirseniz broadcast ve zip yapmak için , burada bir runnable kodu örneğine bakın: https://gist.github.com/MartinHH/a05a87269b1697d5f57a1c77db269767