2016-12-20 30 views
5

Scala Sürüm 2.10.5 Cassandra 3.0 ve Spark 1.6 kullanıyorum. Temel ÖrnekCassandra Tabloya Veri Ekleme Spark DataFrame Kullanma

Works ve Cassandra.So içine mümkün eklemek bilgi Ben şemayı

val person = sc.textFile("hdfs://localhost:9000/user/hduser/person") 
import org.apache.spark.sql._ 
val schema = StructType(Array(StructField("firstName",StringType,true),StructField("lastName",StringType,true),StructField("age",IntegerType,true))) 
val rowRDD = person.map(_.split(",")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2).toInt)) 
val personSchemaRDD = sqlContext.applySchema(rowRDD, schema) 
personSchemaRDD.saveToCassandra 
eşleştirerek Cassandra tabloya eklemek için wan bir csv dosyası vardı
scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40))) 
scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count")) 

Out çalıştı yüzden cassandra veri eklemek istiyorum

SaveToCassndra Iam kullanırken, saveToCassandra alıcısı personSchemaRDD'nin bir parçası değil. Yani farklı bir şekilde

df.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "words_copy", "keyspace" -> "test")).save() 

içinde çalışırken Ama ip üzerinde cassandra bağlanamıyor almanın öğretildiği: Herhangi biri bana bunu yapmanın en iyi şekilde anlatmak port.can. Dosyaları periyodik olarak cassandra'ya dosyalardan kaydetmem gerekiyor.

cevap

4

sqlContext.applySchema(...), DataFrame değerini döndürür ve DataFrame, saveToCassandra yöntemine sahip değildir.

Yapabilirsin onunla .write yöntemi:

val personDF = sqlContext.applySchema(rowRDD, schema) 
personDF.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "words_copy", "keyspace" -> "test")).save() 

biz savetoCassandra yöntemi kullanmak istiyorsanız, en iyi yolu bir vaka sınıfını kullanarak, şema farkında RDD sahip olmaktır.

case class Person(firstname:String, lastName:String, age:Int) 
val rowRDD = person.map(_.split(",")).map(p => Person(p(0),p(1),p(2).toInt) 
rowRDD.saveToCassandra(keyspace, table) 

Dataframe write yöntem çalışması gerekir. İçeriğinizi doğru bir şekilde yapılandırdığınızdan emin olun.

+0

Satır() öğesindeki öğeden birini "val rowRDD = input.map (_. Split (", ")) olarak nasıl değiştirebilirim map (p => Satır (p (0), getTimestamp ((1))), p (2))) YYYY-AA-GG '' HH: mm: ss biçiminde – Anji

+0

@Anji 'yi, "jodatime.DateTime"' in java.util.Date'üne zaman damgalarını eşleyerek daha iyi bir sonuç elde edersiniz Biçim sorunlarını önlemek için – maasg

+0

com.databricks.spark.csv Kullanılırken "NA" hatamın alması için herhangi bir seçenek var. Nedeni: java.text.ParseException: Ayrıştırılamayan numara: "NA" ' – Anji