2015-07-16 15 views
5

Bazı grup numaralarına göre etiketlenmiş vektörleri (LabeledPoint-ler) etiketledim. Her grup için ben ayrı Lojistik Regresyon sınıflandırıcı oluşturmak gerekir: Spark MLlib: her veri grubu için bina sınıflandırıcıları

import org.apache.log4j.{Level, Logger} 
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS 
import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.mllib.regression.LabeledPoint 
import org.apache.spark.mllib.linalg.{Vector, Vectors} 

object Scratch { 

    val train = Seq(
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))), 
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))), 
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))), 
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))), 
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))), 
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0))))) 
) 

    def main(args: Array[String]) { 
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN) 
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) 
    // set up environment 
    val conf = new SparkConf() 
     .setMaster("local[5]") 
     .setAppName("Scratch") 
     .set("spark.executor.memory", "2g") 
    val sc = new SparkContext(conf) 

    val trainRDD = sc.parallelize(train) 
    val modelByGroup = trainRDD.groupByKey().map({case (group, iter) => 
          (group, new LogisticRegressionWithLBFGS().run(iter))}) 
    } 

} 

LogisticRegressionWithLBFGS().run(iter)

RDD ile run eserler nedeniyle değil, Yineleyici o groupBy döner derleme değil. Lütfen giriş verilerinde gruplar (etiketler) olduğu için birçok sınıflandırıcıyı nasıl oluşturulacağını önerin. -

Güncelleme iç içe RDD yineleme çalışmadığını gösterir: diğer dönüşümler içine dönüşümleri kullanmanın yolu, doğru var gibi

import org.apache.log4j.{Level, Logger} 
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS 
import org.apache.spark.rdd.RDD 
import org.apache.spark.{SparkContext, SparkConf} 
import org.apache.spark.mllib.regression.LabeledPoint 
import org.apache.spark.mllib.linalg.{Vector, Vectors} 

object Scratch { 

    val train = Seq(
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))), 
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))), 
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))), 
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))), 
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))), 
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0))))) 
) 

    def main(args: Array[String]) { 
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN) 
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) 
    // set up environment 
    val conf = new SparkConf() 
     .setMaster("local[5]") 
     .setAppName("Scratch") 
     .set("spark.executor.memory", "2g") 
    val sc = new SparkContext(conf) 

    val trainRDD = sc.parallelize(train) 
    val keys : RDD[Int] = trainRDD.map({case (key,_) => key}).distinct 
    for (key <- keys) { 
    // key is Int here! 
     // Get train data for the current group (key): 
     val groupTrain = trainRDD.filter({case (x, _) => x == key }).cache() 

     /** 
     * Which results in org.apache.spark.SparkException: 
     * RDD transformations and actions can only be invoked by the driver, 
     * not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid 
     * because the values transformation and count action cannot be performed inside of the rdd1.map transformation. 
     * For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.sc(RDD.scala:87) 
     */ 
    } 
    } 
} 

görünüyor?

cevap

3

Her bir gruptaki kullanım sınıflandırıcınız mllib gerektirmez. Mllib, dağıtılmış kümelerle kullanılmak üzere tasarlanmıştır (kümeleriniz, her bir işçiye yerel kümeleriniz yok). Harita işlevinde her grupta weka gibi bazı yerel makine öğrenme kitaplığını kullanabilirsiniz.

DÜZENLEME: Her grupta

val keys = wholeRDD.map(_._1).distinct.collect 

var models = List() 
for (key <- keys) { 
    val valuesForKey = wholeRDD.filter(_._1 == key) 
    // train model 
    ... 
    models = model::models 
} 
+0

Ben büyük veri var ben Weka kullanamazsınız yüzden grup için sınıflandırıcı eğitmek için ayarlar. Başka fikirlerin var mı? – zork

+0

@zork Çok fazla grup yoksa, her grup için bir tane de rdd oluşturabilirsiniz. Sadece 'oneKeyRDD = wholeTrainSetRDD.filter (_._ 1 == tuşu)'. – abalcerek

+0

Evet, bunu da düşündüm. Ne yazık ki 30 grubum var! – zork