2013-08-04 17 views
12

Bazen kendimi Stream[X] ve function X => Future Y, Future[Stream[Y]] ile birleştirmek istediğim bir durumda buluyorum ve bir yol bulamıyorum. yapmak için. Mesela benBir Akışı bir Geleceğe döndüren bir işlevle eşleme

val x = (1 until 10).toStream 
def toFutureString(value : Integer) = Future(value toString) 

val result : Future[Stream[String]] = ??? 

Ben doğru sonucu verir ama Future, dönmeden önce tüm akışı tüketmek gibi görünüyor

val result = Future.Traverse(x, toFutureString) 

çalıştı sahip olan az ya da çok yenilgiler amaçlı labaratuvar

'u denedim, ancak buile derlenmiyor

val result = x.map(toFutureString) 

biraz garip ve yararsız döndürür Stream[Future[String]]

şey düzelttirmek Burada ne yapmalıyım?

Düzenleme: Ben Stream sıkışmış değilim, ben kafasını

işlemeye başlamadan önce tüm öğeleri değerlendirilmesi üzerinde engellemez sürece, bir Iterator aynı işlemle aynı derecede mutlu olurdu

Edit2: Future.Traverse yapısının bir geleceği geri göndermeden önce tüm akışı geçmesi gerektiğinden% 100 emin değilim [Stream], ama bence öyle. Eğer değilse, bu kendi başına iyi bir cevap.

Edit3: Sırayla olması için sonuca da ihtiyacım yok, akışta sorun yok veya yineleyici ne olursa olsun iade edildi.

+0

Aşağıdaki cevabımı takip etmek için [bir sorun] (https://issues.scala-lang.org/browse/SI-7718) başvurduğumu unutmayın. –

+0

ah, harika @TravisBrown. Bunu kendim yapmak istedim, ama Jira – Martijn

+0

'a giriş yapmanın bir yolunu bulamadım Biraz belirsiz - daha önce koleksiyondaki tüm öğelere "toFutureString" uygulamasından kaçınmak mı istiyorsunuz? Görünüşe göre, bir Gelecek yaratmak için fazla mesai olmamalı. "Liste" deki kalan öğeler ise, bu ürünleri değerlendirmek için ne yapardı? Listede daha önce Geleceğin Tamamlanması? Scala'da bulabildiğim tüm dizi/travers işlemleri, tekil liste elemanlarında katı gibi görünüyordu. – pdxleif

cevap

9

traverse ile doğru yoldasınız, ancak maalesef standart kütüphanenin tanımının bu durumda biraz bozuk olduğu görülüyor - dönüşten önce akışı tüketmemesi gerekmiyor.

Future.traverse bir "çaprazlanabiliyorsa" tip sarılmış herhangi uygulamalı funktor çalışır çok daha genel bir fonksiyonun belirli sürümü (thesepapers ya da benim cevap örneğin daha fazla bilgi için here, bakınız).

Scalaz kütüphane bu daha genel bir sürümünü sağlar ve bu durumda (ben scalaz-contrib den Future için uygulamalı funktor örneğini alıyorum unutmayın beklendiği gibi çalışır; o hala Scalaz stabil versiyonları, henüz var

import scala.concurrent._ 
import scalaz._, Scalaz._, scalaz.contrib.std._ 

import ExecutionContext.Implicits.global 

def toFutureString(value: Int) = Future(value.toString) 

val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString 

Bu sonsuz akışta hemen döner, bu yüzden bunun varlık ilk tüketen değil emin olduğu: Bu Future) sahip olmadığı Scala 2.9.2, karşı çapraz inşa etti. Bir dipnot olarak


: Eğer Future.traverse için the source bakarsanız bunu akışları durumunda elverişli, ancak gerekli değildir ya da uygun olan foldLeft açısından uygulanan olduğunu göreceksiniz.

+0

FYI, Scala 2.9.3 scala.concurrent –

+0

@ViktorKlang içerir: Doğru, ve bu örnekler [Scalaz çekirdeğine geliyor] (https://github.com/scalaz/scalaz/wiki/Roadmap#sip-14) yakında ama bildiğim kadarıyla henüz somut bir zaman çizelgesi yok. –

+0

Cool, bu iyi bir fikir gibi geliyor –

2

Stream hakkında unutmak: (Benim 8 çekirdekli kutuyu)

import scala.concurrent.Future 
import ExecutionContext.Implicits.global 

val x = 1 to 10 toList 
def toFutureString(value : Int) = Future { 
    println("starting " + value) 
    Thread.sleep(1000) 
    println("completed " + value) 
    value.toString 
} 

verim:

scala> Future.traverse(x)(toFutureString) 
starting 1 
starting 2 
starting 3 
starting 4 
starting 5 
starting 6 
starting 7 
starting 8 
res12: scala.concurrent.Future[List[String]] = [email protected] 

scala> completed 1 
completed 2 
starting 9 
starting 10 
completed 3 
completed 4 
completed 5 
completed 6 
completed 7 
completed 8 
completed 9 
completed 10 
Yani

tanesi 8 o aracılığıyla yapılandırılabilir olsa, derhal (her çekirdek için bir başladı olsun threadpool executor) ve daha sonra tamamlananlar daha fazla başlatıldı. Gelecek [Liste [Dize]] hemen döner ve bir duraklamadan sonra bu "tamamlandı x" mesajlarını basmaya başlar.

Bunun bir örneği, bir Listeniz [Url's] ve Url => Future [HttpResponseBody] türünde bir işleviniz olduğunda kullanılabilir. Bu listede o listede Future.traverse öğesini arayabilir ve bu http isteklerini paralel olarak başlatabilir, sonuçların bir listesi olan tek bir geleceği geri alabilirsiniz.

Ne için gittiğini beğenen bir şey miydi?

+0

Sanırım "akış öğelerini hevesle değerlendirmek istemiyorum", bu unsurlar Futures'ı başlatan göreve girdiklerinde paralellikle çelişiyor gibi görünüyor. Akışın nasıl tüketileceğini/değerlendirilmesini istiyorsunuz? – pdxleif