Statik verileri önce veri kümesi API'sini kullanarak işlemek ve daha sonra bir akış işini yürütmek için DataStream API'sini kullanmak istiyorum. IDE'ye kod yazarsam, mükemmel çalışır. Ama yerel flink iş yöneticisi (tüm paralellik 1) üzerinde çalışırken denediğimde, akış kodu asla yürütmez!Flink: Tek bir programda Veri Kümesi ve Datastream API'si. Mümkün mü?
Örneğin, aşağıdaki kod çalışmıyor:
val parallelism = 1
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(parallelism)
val envStatic = ExecutionEnvironment.getExecutionEnvironment
envStatic.setParallelism(parallelism)
val myStaticData = envStatic.fromCollection(1 to 10)
val myVal: Int = myStaticData.reduce(_ + _).collect().head
val theStream = env.fromElements(1).iterate(iteretion => {
val result = iteretion.map(x => x + myVal)
(result, result)
})
theStream.print()
env.execute("static and streaming together")
Çalıştığım bu şeyi almak için ne denemeliyim?
Kayıtlar: execution logs for above program
yürütme planı: plan görünüyor bir siklik olması.
Günlükler ne diyor? –
@TillRohrmann Bağlantı eklendi. –
İstemci günlüğü ne diyor? –