2016-04-12 26 views
2

Statik verileri önce veri kümesi API'sini kullanarak işlemek ve daha sonra bir akış işini yürütmek için DataStream API'sini kullanmak istiyorum. IDE'ye kod yazarsam, mükemmel çalışır. Ama yerel flink iş yöneticisi (tüm paralellik 1) üzerinde çalışırken denediğimde, akış kodu asla yürütmez!Flink: Tek bir programda Veri Kümesi ve Datastream API'si. Mümkün mü?

Örneğin, aşağıdaki kod çalışmıyor:

val parallelism = 1 

val env = StreamExecutionEnvironment.getExecutionEnvironment 
env.setParallelism(parallelism) 

val envStatic = ExecutionEnvironment.getExecutionEnvironment 
envStatic.setParallelism(parallelism) 

val myStaticData = envStatic.fromCollection(1 to 10) 
val myVal: Int = myStaticData.reduce(_ + _).collect().head 

val theStream = env.fromElements(1).iterate(iteretion => { 
    val result = iteretion.map(x => x + myVal) 
    (result, result) 
}) 
theStream.print() 
env.execute("static and streaming together") 

Çalıştığım bu şeyi almak için ne denemeliyim?

Kayıtlar: execution logs for above program

yürütme planı: plan görünüyor bir siklik olması.

+0

Günlükler ne diyor? –

+0

@TillRohrmann Bağlantı eklendi. –

+0

İstemci günlüğü ne diyor? –

cevap

3

Birden çok alt işten oluşan bir Flink işiniz varsa, ör. count, collect veya print tarafından tetiklendiğinde, işi web arayüzü üzerinden gönderemezsiniz. Web arayüzü sadece tek bir Flink işini destekler.