Akka

2016-04-10 44 views
0

'da hem Strict hem de Streamed WebSocket İletilerini Tüketmek Akka HTTP kullanarak bir web yuvası hizmeti oluşturmayı deniyorum. Birden çok kareye ulaşan Aktarılan iletilerin yanı sıra, toplam olarak gelen Katı iletilerle başa çıkmam gerekiyor. Web soketlerinin bir akışa aktarılmasını geçmek için handleWebSocketMessages() ile bir rota kullanıyorum.Akka

val route: Route = 
    get { 
    handleWebSocketMessages(createFlow()) 
    } 

def createFlow(): Flow[Message, Message, Any] = Flow[Message] 
    .collect { 
    case TextMessage.Strict(msg) ⇒ msg 
    case TextMessage.Streamed(stream) => ??? // <= What to do here?? 
    } 
    .via(createActorFlow()) 
    .map { 
    case msg: String ⇒ TextMessage.Strict(msg) 
    } 

def createActorFlow(): Flow[String, String, Any] = { 
    // Set Up Actors 
    // ... (this is working) 
    Flow.fromSinkAndSource(in, out) 
} 

nasıl iki kolu Sıkı ve Akış hem mesajlar gerçekten emin değilim: Ben kodu şöyle görünür.

.collect { 
    case TextMessage.Strict(msg) ⇒ Future.successful(msg) 
    case TextMessage.Streamed(stream) => stream.runFold("")(_ + _) 
    } 

Ama şimdi benim akışı açıkçası iletileri işlemek gerekir, özellikle de o zaman nasıl başa emin değilim sadece Strings, yerine gelecek [dize] işlemek zorundadır: Ben böyle bir şey yapabileceğini fark sırayla.

Bu akka sorununu gördüm, ki bu biraz alakalı görünüyor, ama tam olarak ihtiyacım olan şey değil (sanmıyorum?).

https://github.com/akka/akka/issues/20096

Herhangi bir yardım

cevap

0

nihai cevap dayalı Aşağıda (svezfaz'a teşekkürler) cevabı şunun gibi bir şeye dönüşmüştü:

val route: Route = 
    get { 
    handleWebSocketMessages(createFlow()) 
    } 

def createFlow(): Flow[Message, Message, Any] = Flow[Message] 
    .collect { 
    case TextMessage.Strict(msg) ⇒ 
     Future.successful(MyCaseClass(msg)) 
    case TextMessage.Streamed(stream) => stream 
     .limit(100)     // Max frames we are willing to wait for 
     .completionTimeout(5 seconds) // Max time until last frame 
     .runFold("")(_ + _)   // Merges the frames 
     .flatMap(msg => Future.successful(MyCaseClass(msg))) 
    } 
    .mapAsync(parallelism = 3)(identity) 
    .via(createActorFlow()) 
    .map { 
    case msg: String ⇒ TextMessage.Strict(msg) 
    } 

def createActorFlow(): Flow[MyCaseClass, String, Any] = { 
    // Set Up Actors as source and sink (not shown) 
    Flow.fromSinkAndSource(in, out) 
} 
2

Katlama mantıklı bir seçenek gibi geliyor appriciated olacaktır. senin akışlarında geleceği Handling kullanılarak yapılabilir (örneğin)

flowOfFutures.mapAsync(parallelism = 3)(identity) 

mapAsync docs uyarınca, gelen iletilerin düzeni korumak anlamına geldiğini unutmayın.

Farklı bir kayda göre

, diğer hassas önlemler akış WS iletileri işlemek için katlamak için mesajın bağlı zaman ve mekan için completionTimeout ve limitini kullanmak olabilir (örneğin)

stream.limit(x).completionTimeout(5 seconds).runFold(...) 
+0

Teşekkürler svezfaz. İpuçlarınız, sorunumun çözülmesine yardımcı oldu. Şimdi çalışıyorum. –

+0

Bu dava için daha fazla kolaylık yöntemi eklemek, bu arada bizim radarımıza da bir göz atın: https://github.com/akka/akka/issues/20096 –

+0

Teşekkürler. Konuya abone oldum. Bu soru/cevabın insanlara ortalama zamanda yardım edeceğini umuyorum. –