Genel olarak Akka akışları ve akışları için yeniyim, bu yüzden kavramsal bir düzeyde bir şeyi yanlış anlamış olabilirim, ancak gelecekteki bir çözüme dönüşene kadar geri tepmeyi yaratabileceğim herhangi bir yol var mı? Esasen ne yapmak istediğim şu şekildedir: Bir Akka akışının içinde bir Geleceğe ait geri tepme yaratın
object Parser {
def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ???
}
val futures = FileIO.fromPath(path)
.map(st => Parser.parseBytesToSeq(st.toByteBuffer))
.batch(1000, x => x)(_ ++ _)
.map(values => doAsyncOp(values))
.runWith(Sink.seq)
def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ???
Bayt
bir dosyadan okuma veExampleObject
s
Seq
s yayar ayrıştırıcı, için akışlarının ve bu bir
Future
döndüren bir zaman uyumsuz işlem akıyor edilir.
Future
çözülene kadar, akımın geri kalanı geri tepme durumuna gelinceye kadar devam etmek istiyorum, sonra Gelecek çözümlendiğinde bir başka
Seq[ExampleObject]
doAsyncOp
geçirerek geri tepme devam eder ve böylece devam eder.
Şu anda bu çalışmayı var:
Await.result(doAsyncOp(values), 10 seconds)
Ama benim anlayış bu dizisinin tamamını kilitleniyor ve kötü olmasıdır. Etrafında daha iyi bir yolu var mı?
Eğer bu yardımcı oluyorsa, büyük resim, Jawn ile çok büyük bir JSON dosyasını (belleğe sığmayacak kadar büyük) ayrıştırmaya çalışıyorum, daha sonra bu nesneler, ElasticSearch'e indekslenecek şekilde geçmelidir. ayrıştırıldı - ElasticSearch, bekleyen 50 işlem kuyruğuna sahip, eğer taşması durumunda yeni nesneleri reddetmeye başlar.