2015-09-24 13 views
5

Sadece flink etmeye başladım. Ben aşağıdaki kodu yazıp var "DATASOURCE en çıkışları bir hataya neden: kullanıcı kodu sarmalayıcı okunamadı" hatasıFlink: DataSource'ın çıktıları bir hataya neden oldu: Kullanıcı kod sarıcısını okuyamadı

ben yanlış yapıyorum herhangi bir şey var mı?

sürümü: Flink v 0.9.1 (Hadoop'un 1) Hadoop kullanmayan: Yerel uygulama kabuk: Scala kabuk

Kodu:

val env = ExecutionEnvironment.getExecutionEnvironment 
val text = env.readTextFile("/home/ashish/Downloads/spark/synop.201501.csv" 
val data_split = text.flatMap{_.split(';')} 
data_split.first(3).print() 

Not: giriş dosyası kullanır ';'

Scala-Flink> val data_split = text.flatMap{_.split(';')} 
data_split: org.apache.flink.api.scala.DataSet[String] = [email protected] 
Scala-Flink> data_split.first(3).print() 
09/24/2015 09:20:14 Job execution switched to status RUNNING. 
09/24/2015 09:20:14 CHAIN DataSource (at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:14) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:15))(1/1) switched to SCHEDULED 
09/24/2015 09:20:14 CHAIN DataSource (at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:14) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:15))(1/1) switched to DEPLOYING 
09/24/2015 09:20:14 CHAIN DataSource (at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:14) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:15))(1/1) switched to FAILED 
java.lang.Exception: Call to registerInputOutput() of invokable failed 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1 
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:501) 
    ... 1 more 
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1 
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:284) 
    at org.apache.flink.runtime.operators.RegularPactTask.instantiateUserCode(RegularPactTask.java:1507) 
    at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39) 
    at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72) 
    at org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1378) 
    at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:290) 
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:86) 
    ... 2 more 
Caused by: java.lang.ClassNotFoundException: $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.Class.forName0(Native Method) 
    at java.lang.Class.forName(Class.java:348) 
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:71) 
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) 
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) 
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264) 
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282) 
    ... 8 more 

09/24/2015 09:20:14 Job execution switched to status FAILING. 
09/24/2015 09:20:14 CHAIN GroupReduce (GroupReduce at org.apache.flink.api.scala.DataSet.first(DataSet.scala:707)) -> FlatMap (collect())(1/1) switched to CANCELED 
09/24/2015 09:20:14 DataSink (collect() sink)(1/1) switched to CANCELED 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) 
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:221) 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: $anonfun$1 
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:501) 
    ... 1 more 
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: $anonfun$1 
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:284) 
    at org.apache.flink.runtime.operators.RegularPactTask.instantiateUserCode(RegularPactTask.java:1507) 
    at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39) 
    at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72) 
    at org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1378) 
    at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:290) 
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:86) 
    ... 2 more 
Caused by: java.lang.ClassNotFoundException: $anonfun$1 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.Class.forName0(Native Method) 
    at java.lang.Class.forName(Class.java:348) 
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:71) 
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) 
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) 
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264) 
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282) 
    ... 8 more 
+1

Bunu yanıtlamaya yardımcı olacak birkaç şey: (1) Üçüncü satır (yeni yürütme ortamını elde etme) kaldırılmamalıdır. Farklı ortamları karıştırmak büyük olasılıkla sorunlara neden olacaktır (ve aslında probleminizin sebebi de olabilir). (2) Tam istisna yığını izlemesini gönderir misiniz? Kök neden eksik, yığın izinde daha aşağı "neden" altında olmalıdır. (3) Kod örneğiniz kesilmiş çizgiler var gibi görünüyor, tüm satırları yayınlayabilir misiniz? –

+0

val env ilk satır olmalıydı ... Bu konuda üzgünüm .val env = ExecutionEnvironment.getExecutionEnvironmentval text = env.readTextFile ("/ home/ashish/Downloads/spark/synop.201501.csv" val data_split = text.flatMap {. _ bölünmüş (';')} data_split.first (3) .print() – ashish

+0

tam hata günlüğünü – ashish

cevap

2

sorun ilk satırda açıklamada "val env = ExecutionEnvironment.getExecutionEnvironment" dir: deliminator

Hata olarak.

Scala Kabuğunda, Shell tarafından oluşturulan sınıfların düzgün yüklenmesi için yapılandırılan "env" değişkenine bağlı bir ExecutionEnvironment zaten var.

Yeni bir ExecutionEnvironment oluşturarak, önceden yapılandırılmış bu ortamı uygun şekilde ayarlanmamış bir ayar ile geçersiz kılarsınız.

+0

Stephan, yerel olarak test edilen flink programının nasıl olması gerektiğine dair bir link belirtebilir misiniz emacs-ensime ortamında gelişmekte olan flink 1.2 projesiyle aynı problemi yaşıyorum. 'env.readTextFile (" file:/d: /data/test.csv ") first (5) .print()' çalışır iyi, ancak 'env.rea dTextFile ("file:/d: /data/test.csv") harita (_. split ('')) ilk (5) .print() '- değil. –

+0

Ve sbt konsolunda windows cmd'de düzgün çalışıyor. Ensime-inferior-scala 'org.apache.flink.runtime.operators.util.CorruptConfigurationException: Kullanıcı kodu sarmalayıcı okunamadı: $ anonfun $ 1'. –