2015-01-22 17 views
9

2 işçi ile tutulması (maven conf) kullanarak aşağıdaki kodu çalıştırmaya çalışıyorum ve her biri 2 çekirdeğe sahip ya da kıvılcım göndermeye çalışıyorum. bu koduspark ssc.textFileStream dizinden dosya aktarmıyor

15/01/22 21:57:13 INFO FileInputDStream: New files at time 1421944033000 ms: 

15/01/22 21:57:13 INFO JobScheduler: Added jobs for time 1421944033000 ms 
15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms 
15/01/22 21:57:13 INFO SparkContext: Starting job: foreach at StreamingKMean.java:33 
15/01/22 21:57:13 INFO DAGScheduler: Job 3 finished: foreach at StreamingKMean.java:33, took 0.000094 s 
Sentences Collected from files [] 
------------------------------------------- 
15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.0 from job set of time 1421944033000 ms 
Time: 1421944033000 ms 
-------------------------------------------15/01/22 21:57:13 INFO JobScheduler: Starting job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms 


15/01/22 21:57:13 INFO JobScheduler: Finished job streaming job 1421944033000 ms.1 from job set of time 1421944033000 ms 
15/01/22 21:57:13 INFO JobScheduler: Total delay: 0.028 s for time 1421944033000 ms (execution: 0.013 s) 
15/01/22 21:57:13 INFO MappedRDD: Removing RDD 5 from persistence list 
15/01/22 21:57:13 INFO BlockManager: Removing RDD 5 
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO FileInputDStream: Cleared 0 old files that were older than 1421943973000 ms: 
15/01/22 21:57:13 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() 

Sorun ait

public class StreamingWorkCount implements Serializable { 

    public static void main(String[] args) { 
     Logger.getLogger("org.apache.spark").setLevel(Level.WARN); 
     JavaStreamingContext jssc = new JavaStreamingContext(
       "spark://192.168.1.19:7077", "JavaWordCount", 
       new Duration(1000)); 
     JavaDStream<String> trainingData = jssc.textFileStream(
       "/home/bdi-user/kaushal-drive/spark/data/training").cache(); 
     trainingData.foreach(new Function<JavaRDD<String>, Void>() { 

      public Void call(JavaRDD<String> rdd) throws Exception { 
       List<String> output = rdd.collect(); 
       System.out.println("Sentences Collected from files " + output); 
       return null; 
      } 
     }); 

     trainingData.print(); 
     jssc.start(); 
     jssc.awaitTermination(); 
    } 
} 

Ve günlüğü i almıyorum veri dizininde olduğunu dosyası oluşturacak olmasıdır. Lütfen bana yardım et.

+0

Pencere makinesinde tam olarak aynı sorunla karşılaşır.Lütfen –

+0

önerdiğimi düşünüyorum, bu sadece HDFS'de çalışıyor ve yerel dosya sisteminde değil –

cevap

8

Başka bir dizinde deneyin ve iş çalışırken bu dosyaları bu dizine kopyalayın.

+0

evet, bunu başka bir direktifle denedim. Sorunun ne olduğunu ve kütükte nasıl hata ayıklandığını bile anlamadım. – Kaushal

+1

İşe başladığınızda dizin boş muydu? – pzecevic

+0

Aslında bazı dosyalar zaten var ve işime başladığımda bazı dosyaları da kopyalarım. – Kaushal

1

Yolunuzun önüne şemayı, yani file:// veya hdfs:// eklemeniz gerektiğini düşünüyorum. Aslında file:// olduğunu ve yolun "önünde" eklenmesi gerekiyor hdfs://, böylece toplam yol file:///tmp/file.txt veya hdfs:///user/data olur:


çünkü benim yorum düzenleme geri alınıyor. Yapılandırmada NameNode kümesi yoksa, ikincisinin hdfs://host:port/user/data olması gerekir.

+1

çalışır, ancak yerel dosya sistemi 'file: ///' (kıvılcım dosya: //) öneki ile kullandığımda çalışır, işe yaramıyor. – Kaushal

+1

Bunun nedeni, bir küme kullandığınız ve belirtilen yolun tüm Spark yürütücüler tarafından erişilebilir olması, yani, Spark sürücüsünün buna erişebilmesi yeterli değildir. – tgpfeiffer

3

aynı soruna sahipti.

hatlar = jssc.textFileStream ("file: /// Users/projeler/kıvılcım/test/veri ') İşte benim kodudur; TextFileSTream olduğunu

çok hassas;

1. Run Spark program 
2. touch datafile 
3. mv datafile datafile2 
4. mv datafile2 /Users/projects/spark/test/data 

ve bu did it

+0

Evet iyi çalıştı! – lihongxu

0

javadoc işlevi yalnızca yeni bir dosya akışları öneriyor: ne ben yapıyor sona oldu. s.

Ref: https://spark.apache.org/docs/1.0.1/api/java/org/apache/spark/streaming/api/java/JavaStreamingContext.html#textFileStream(java.lang.String)

yeni dosyalar için bir Hadoop uyumlu dosya sistemini izler ve (LongWritable, TextInputFormat olarak Metin ve giriş biçimi olarak değer olarak tuşunu kullanarak) Metin dosyaları olarak okur bir giriş akışı oluşturun. Dosyaları, aynı dosya sistemi içindeki başka bir konumdan "taşıyarak" izlenen dizine yazılmalıdır. Ile başlayan dosya adları. göz ardı edilir. klasördeki dosyalar veya güncellenmiş eklenen varlık olduğunda

0

textFileStream sadece bir klasörü izleyebilir.

Dosyaları okumak istiyorsanız, SparkContext.textFile'u kullanabilirsiniz.

0

Kıvılcım Aktarımının yalnızca dizindeki yeni dosyaları okuyacağına, güncellenmiş olanlara (dizinde olduklarında) ve hepsinin de aynı biçime sahip olduğuna dikkat etmelisiniz.

Source