2017-11-18 166 views
7

Aşağıdakileri yapmak için bir Akka akışları birleştiricisi var mı (yoksa bu etkiyle ilgili bir şey)? (Şimdilik and diyelim.)İki Akış nasıl yan yana oluşturulur?

(flow1: Flow[I, O, Mat]).and[O2](flow2: Flow[I, O2, Mat]): Flow[I, (O, O2), Mat] 

semantik kaynağı ne olursa olsun, onun elemanları hem Flow s aktarılmasını sağlayacak ve elde edilen çıktılar bir demet olarak yeni Flow birleştirilebilir olacağını olurdu. (Kategori kuramdan oklarla aşina olanlar fonksiyonel programlama aromalı, ben &&& gibi bir şey arıyorum.) Yani zip ve alsoTo ilgili görünüyordu kütüphanede iki combinators vardır

. Ancak, eski, bir SourceShape ve ikincisi, bir SinkShape kabul eder. Ne de bir GraphShape kabul ediyorum. Bu neden böyle?

Benim kullanım durumu aşağıdaki gibi bir şey:

someSource 
    .via(someFlowThatDoesWhateverItWasDoingEarlierButNowAlsoEmitsInputsAsIs) 
    .runWith(someSink) 

Bu çalışır, ancak ben arıyorum:

someSource 
    .via(someFlowThatReturnsUnit.and(Flow.apply)) 
    .runWith(someSink) 

.and gibi bir şey bulamayan böyle benim özgün Flow modifiye Daha temiz, daha karmaşık bir çözüm.

+1

bir akış kesinlikle 1 değildir. (GraphDSL kullanabilir ve Yayın + Birleştirme kullanabilirsiniz) –

cevap

6

Bildirimi

Viktor Klang açıklamalarda belirtildiği gibi: girdi elemanı saymak ve çıkış bakımından 1: Bilindiği zaman Tuple2[O,O2] içine sıkıştırma hem akar flow1 & flow2, 1 olduğu tek geçerli olduğunu eleman sayısı

Grafik Tabanlı Çözelti

Bir demet yapısı, bir Graph içinde oluşturulabilir. Aslında, soru neredeyse mükemmel tanıtım örnek eşleşir:

enter image description here

linkte örnek kodu genişletme kullanabileceğiniz Broadcast ve Zip

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
    import GraphDSL.Implicits._ 
    val in = Source(1 to 10) 
    val out = Sink.ignore 

    val bcast = builder.add(Broadcast[Int](2)) 

    val merge = builder.add(Zip[Int, Int]()) //different than link 

    val f1, f2, f4 = Flow[Int].map(_ + 10) 

    val f3 = Flow[(Int, Int)].map(t => t._1 + t._2) //different than link 

    in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out 
       bcast ~> f4 ~> merge 
    ClosedShape 
})//end RunnableGraph.fromGraph 

Biraz Hacky Akış Çözüm

Saf bir akış çözümü arıyorsanız, ara akışların kullanılması mümkündür, ancak Mat muhafaza olmaz ve her giriş elemanı için 2 akışları gerçekleşme durumlarını içerir: Bu sıkıştırma jenerik çoğu Akışları, sonra çıktı türünü olmak için

def andFlows[I, O, O2] (maxConcurrentSreams : Int) 
         (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed]) 
         (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (O, O2), _] = 
    Flow[I].mapAsync(maxConcurrentStreams){ i => 

    val o : Future[O] = Source 
          .single(i) 
          .via(flow1) 
          .to(Sink.head[O]) 
          .run() 

    val o2 : Future[O2] = Source 
          .single(i) 
          .via(flow2) 
          .to(Sink.head[O2]) 
          .run() 

    o zip o2 
    }//end Flow[I].mapAsync 

Jenerik Zipping

olacak (Seq[O], Seq[O2]) olmak zorunda.Bu tür yukarıda andFlows fonksiyonunda Sink.seq kullanılarak yerine Sink.head ile üretilebilir: 1. bu nedenle böyle bir genel bağdaştırıcı (1 çıkışı için 1 giriş) zor olacaktır:

def genericAndFlows[I, O, O2] (maxConcurrentSreams : Int) 
           (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed]) 
           (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (Seq[O], Seq[O2]), _] = 
    Flow[I].mapAsync(maxConcurrentStreams){ i => 

    val o : Future[Seq[O]] = Source 
           .single(i) 
           .via(flow1) 
           .to(Sink.seq[O]) 
           .run() 

    val o2 : Future[Seq[O2]] = Source 
           .single(i) 
           .via(flow2) 
           .to(Sink.seq[O2]) 
           .run() 

    o zip o2 
    }//end Flow[I].mapAsync