Bazı grup numaralarına göre etiketlenmiş vektörleri (LabeledPoint-ler) etiketledim. Her grup için ben ayrı Lojistik Regresyon sınıflandırıcı oluşturmak gerekir: Spark MLlib: her veri grubu için bina sınıflandırıcıları
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
object Scratch {
val train = Seq(
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0)))))
)
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setMaster("local[5]")
.setAppName("Scratch")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val trainRDD = sc.parallelize(train)
val modelByGroup = trainRDD.groupByKey().map({case (group, iter) =>
(group, new LogisticRegressionWithLBFGS().run(iter))})
}
}
LogisticRegressionWithLBFGS().run(iter)
RDD
ile
run
eserler nedeniyle değil, Yineleyici o
groupBy
döner derleme değil. Lütfen giriş verilerinde gruplar (etiketler) olduğu için birçok sınıflandırıcıyı nasıl oluşturulacağını önerin. -
Güncelleme iç içe RDD yineleme çalışmadığını gösterir: diğer dönüşümler içine dönüşümleri kullanmanın yolu, doğru var gibi
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}
object Scratch {
val train = Seq(
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))),
(1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))),
(8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0)))))
)
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setMaster("local[5]")
.setAppName("Scratch")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val trainRDD = sc.parallelize(train)
val keys : RDD[Int] = trainRDD.map({case (key,_) => key}).distinct
for (key <- keys) {
// key is Int here!
// Get train data for the current group (key):
val groupTrain = trainRDD.filter({case (x, _) => x == key }).cache()
/**
* Which results in org.apache.spark.SparkException:
* RDD transformations and actions can only be invoked by the driver,
* not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid
* because the values transformation and count action cannot be performed inside of the rdd1.map transformation.
* For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.sc(RDD.scala:87)
*/
}
}
}
görünüyor?
Ben büyük veri var ben Weka kullanamazsınız yüzden grup için sınıflandırıcı eğitmek için ayarlar. Başka fikirlerin var mı? – zork
@zork Çok fazla grup yoksa, her grup için bir tane de rdd oluşturabilirsiniz. Sadece 'oneKeyRDD = wholeTrainSetRDD.filter (_._ 1 == tuşu)'. – abalcerek
Evet, bunu da düşündüm. Ne yazık ki 30 grubum var! – zork