Scala Sürüm 2.10.5 Cassandra 3.0 ve Spark 1.6 kullanıyorum. Temel ÖrnekCassandra Tabloya Veri Ekleme Spark DataFrame Kullanma
Works ve Cassandra.So içine mümkün eklemek bilgi Ben şemayıval person = sc.textFile("hdfs://localhost:9000/user/hduser/person")
import org.apache.spark.sql._
val schema = StructType(Array(StructField("firstName",StringType,true),StructField("lastName",StringType,true),StructField("age",IntegerType,true)))
val rowRDD = person.map(_.split(",")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2).toInt))
val personSchemaRDD = sqlContext.applySchema(rowRDD, schema)
personSchemaRDD.saveToCassandra
eşleştirerek Cassandra tabloya eklemek için wan bir csv dosyası vardı
scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count"))
Out çalıştı yüzden cassandra veri eklemek istiyorum
SaveToCassndra Iam kullanırken, saveToCassandra alıcısı personSchemaRDD'nin bir parçası değil. Yani farklı bir şekilde
df.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "words_copy", "keyspace" -> "test")).save()
içinde çalışırken Ama ip üzerinde cassandra bağlanamıyor almanın öğretildiği: Herhangi biri bana bunu yapmanın en iyi şekilde anlatmak port.can. Dosyaları periyodik olarak cassandra'ya dosyalardan kaydetmem gerekiyor.
Satır() öğesindeki öğeden birini "val rowRDD = input.map (_. Split (", ")) olarak nasıl değiştirebilirim map (p => Satır (p (0), getTimestamp ((1))), p (2))) YYYY-AA-GG '' HH: mm: ss biçiminde – Anji
@Anji 'yi, "jodatime.DateTime"' in java.util.Date'üne zaman damgalarını eşleyerek daha iyi bir sonuç elde edersiniz Biçim sorunlarını önlemek için – maasg
com.databricks.spark.csv Kullanılırken "NA" hatamın alması için herhangi bir seçenek var. Nedeni: java.text.ParseException: Ayrıştırılamayan numara: "NA" ' – Anji