2016-04-14 37 views
1

kullanarak Apache Flink HA kümeye akışını dağıtmak olamazsorunsuz (bir JobManager ve birkaç TaskManagers ile) Apache Flink kurulumunu standalone için ben akışını dağıtabilir Flink CLI

------------------------------------------------------------ 
The program finished with the following exception: 

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds 
    at org.apache.flink.client.program.Client.runDetached(Client.java:406) 
    at org.apache.flink.client.program.Client.runDetached(Client.java:366) 
    at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75) 
    at org.apache.flink.client.program.Client.runDetached(Client.java:278) 
    at org.apache.flink.client.CliFrontend.executeProgramDetached(CliFrontend.java:844) 
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:330) 
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) 
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) 
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 milliseconds 
    at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:221) 
    at org.apache.flink.client.program.Client.runDetached(Client.java:403) 
    ... 7 more 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds] 
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) 
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
    at scala.concurrent.Await$.result(package.scala:190) 
    at scala.concurrent.Await.result(package.scala) 
    at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:218) 
    ... 8 more 

Aktif İş Yöneticisi oturum açmak için aşağıdaki hataları yazma:

2016-04-14 13:54:44,160 WARN akka.remote.ReliableDeliverySupervisor      - Association with remote system [akka.tcp://[email protected]:62784] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 
2016-04-14 13:54:46,299 WARN org.apache.flink.runtime.jobmanager.JobManager    - Discard message LeaderSessionMessage(null,TriggerSavepoint(5de582462f334caee4733c60c6d69fd7)) because the expected leader session ID Some(72630119-fd0a-40e7-8372-45c93781e99f) did not equal the received leader session ID None. 
aynı komuta ve Bağımsız HA kümeye bu komut zam hatayı dağıtmak

Yani, böyle bir hatanın nedenini anlayamıyorum?

Gerekli ek bilgi varsa bana bildirin.

P.S.

Flink Dashboard'dan dağıt, Bağımsız HA kümesi için iyi çalışıyor. Bu sorun, yalnızca Flink CLI aracılığıyla dağıttığımda görünür.

Güncelleme

Ben açık hayvan bakıcısı, diskte Flink tarafından kullanılan ve Flink Bağımsız HA küme-dağıtmak yeniden net dizinleri. Sonra akış kullanımı bin/flink run komutunu çalıştırmaya çalışıyorum. Gördüğünüz gibi JobManager problem hakkında sadece bir satır yazabilir (bakınız flink - jobmanager-0-example-app-1.stag.local.log).

Tüm JobManagers ve TaskManagers aynı flink-conf.yaml kullanın:

jobmanager.heap.mb: 1024 
jobmanager.web.port: 8081 

taskmanager.data.port: 6121 
taskmanager.heap.mb: 2048 
taskmanager.numberOfTaskSlots: 4 
taskmanager.memory.preallocate: false 
taskmanager.tmp.dirs: /flink/data/task_manager 

blob.server.port: 6130 
blob.storage.directory: /flink/data/blob_storage 

parallelism.default: 4 

state.backend: filesystem 
state.backend.fs.checkpointdir: s3a://example-flink/checkpoints 

restart-strategy: none 
restart-strategy.fixed-delay.attempts: 2 
restart-strategy.fixed-delay.delay: 60s 

recovery.mode: zookeeper 
recovery.zookeeper.quorum: zookeeper-1.stag.local:2181,zookeeper-2.stag.local:2181,zookeeper-3.stag.local:2181 
recovery.zookeeper.path.root: /example/flink 
recovery.zookeeper.storageDir: s3a://example-flink/recovery 
recovery.jobmanager.port: 6123 

fs.hdfs.hadoopconf: /flink/conf 

Yani, bağımsız HA küme doğru şekilde yapılandırılmış gibi görünüyor.

Güncelleme 2

Bilginize: Ben anlatıldığı here olarak Bağımsız HA küme yüklemek istiyorum. YARN HA kümesi değil.

Güncelleme 3

İşte bin/flink CLI tarafından oluşturulan günlük geçerli: flink-username-client-hostname.local.log.

+0

Tüm jobmanager günlüğünü yapıştırabilir misiniz? –

+0

Uygun HA ayarlarını, (FLINK_HOME/conf/flink-conf.yaml' içinde bulunan) için kullanılan "flink-conf.yaml" dosyasında ayarladınız mı? –

+0

@TillRohrmann Günlükleri ekledim ve "flink-conf.yaml". Herhangi bir sorun görüyor musun? –

cevap

2

HA modunda bir Flink kümesini başlatırken, JobManager adresi ve onun lideri kimliği belirtilen ZooKeeper kümesine yazılır. JobManager ile iletişim kurmak için sadece adresi değil, aynı zamanda lider adresini de biliyorsunuz. Bu nedenle, CLI tarafından okunan 'flink-conf.yaml' dosyanızda aşağıdaki parametreleri belirtmelisiniz.Bu bilgilerle

recovery.mode: zookeeper 
recovery.zookeeper.quorum: address of your cluster 
recovery.zookeeper.path.root: ZK path you've started your cluster with 

istemci zookeeper küme ve nerede JobManager adresi ve lideri kimliğini bulmanız bulabileceğiniz biliyor.

+0

'bin/flink' CLI'nin flink-conf.yaml'den ayarları kullandığını söylemek istersiniz. Sağ? –

+1

Evet, cli "flink-conf.yaml" değerini okur. –

+0

"flink-conf.yaml" yi Bağımsız HA kümemden dev ortamıma kopyaladım ve bin/flink run işlevini çalıştırdım. Bundan sonra akışım HA kümesinde başladı. Bence bu 'flink-conf.yaml' ayarlarının bir kısmını okumak için kötü bir fikir çünkü bence bu kesinlikle bir anlaşılma değil ve bin/flink 'Bağımsız HA kümesine bağlanmak için gereken argümanlara sahip olmalı. –