2015-11-25 19 views
8

Birden çok sütun içeren org.apache.spark.sql.DataFrame sahibim. I -1 ve 1 arasında veri ölçek ve StandardScaler bulunan MinMax Normalleştirmede scala

scala> val df = sqlContext.csvFile("tenop.csv") 
df: org.apache.spark.sql.DataFrame = [gst_id_matched: string, 
    ip_crowding: string, lat_long_dist: double, stream_name_1: string] 

org.apache.spark.sql.DataFrame

gibi veri türü korumak için minmaks Normalizasyon veya herhangi bir teknik kullanılarak 1 kolonu (lat_long_dist) ölçeklendirmek için isteyen Bu seçenek, transformasyonu yapmadan önce veri kümesini dönüştürmeyi gerektirir. Basit bir şekilde temiz bir yol var.

cevap

9

Ben ne istediğini zaten Spark ile oynarken Burada başka öneri bu

import org.apache.spark.sql.Row 
import org.apache.spark.sql.functions.{min, max, lit} 

val df = sc.parallelize(Seq(
    (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3) 
)).toDF("k", "v") 

val (vMin, vMax) = df.agg(min($"v"), max($"v")).first match { 
    case Row(x: Double, y: Double) => (x, y) 
} 

val scaledRange = lit(2) // Range of the scaled variable 
val scaledMin = lit(-1) // Min value of the scaled variable 
val vNormalized = ($"v" - vMin)/(vMax - vMin) // v normalized to (0, 1) range 

val vScaled = scaledRange * vNormalized + scaledMin 

df.withColumn("vScaled", vScaled).show 

// +---+-----+--------------------+ 
// | k| v|    vScaled| 
// +---+-----+--------------------+ 
// | 1| 0.5| -0.3093093093093092| 
// | 2| 10.2| 0.27327327327327344| 
// | 3| 5.7|0.003003003003003...| 
// | 4|-11.0|    -1.0| 
// | 5| 22.3|     1.0| 
// +---+-----+--------------------+ 
11

gibi bir şey sanırım.

Neden MinMaxScaler'ı ml paketinde kullanmıyorsunuz?

Bunu, aynı örnekle sıfır323'den deneyelim.

import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.ml.feature.MinMaxScaler 
import org.apache.spark.sql.functions.udf 

val df = sc.parallelize(Seq(
    (1L, 0.5), (2L, 10.2), (3L, 5.7), (4L, -11.0), (5L, 22.3) 
)).toDF("k", "v") 

//val df.map(r => Vectors.dense(Array(r.getAs[Double]("v")))) 

val vectorizeCol = udf((v:Double) => Vectors.dense(Array(v))) 
val df2 = df.withColumn("vVec", vectorizeCol(df("v")) 

val scaler = new MinMaxScaler() 
    .setInputCol("vVec") 
    .setOutputCol("vScaled") 
    .setMax(1) 
    .setMin(-1) 

scaler.fit(df2).transform(df2).show 
+---+-----+-------+--------------------+ 
| k| v|  vv|     vs| 
+---+-----+-------+--------------------+ 
| 1| 0.5| [0.5]|[-0.3093093093093...| 
| 2| 10.2| [10.2]|[0.27327327327327...| 
| 3| 5.7| [5.7]|[0.00300300300300...| 
| 4|-11.0|[-11.0]|    [-1.0]| 
| 5| 22.3| [22.3]|    [1.0]| 
+---+-----+-------+--------------------+ 

Aynı anda birden çok sütunun ölçeklendirilmesinden yararlanın.

val df = sc.parallelize(Seq(
    (1.0, -1.0, 2.0), 
    (2.0, 0.0, 0.0), 
    (0.0, 1.0, -1.0) 
)).toDF("a", "b", "c") 

import org.apache.spark.ml.feature.VectorAssembler 

val assembler = new VectorAssembler() 
    .setInputCols(Array("a", "b", "c")) 
    .setOutputCol("features") 

val df2 = assembler.transform(df) 

// Reusing the scaler instance above with the same min(-1) and max(1) 
scaler.setInputCol("features").setOutputCol("scaledFeatures").fit(df2).transform(df2).show 
+---+----+----+--------------+--------------------+ 
| a| b| c|  features|  scaledFeatures| 
+---+----+----+--------------+--------------------+ 
|1.0|-1.0| 2.0|[1.0,-1.0,2.0]|  [0.0,-1.0,1.0]| 
|2.0| 0.0| 0.0| [2.0,0.0,0.0]|[1.0,0.0,-0.33333...| 
|0.0| 1.0|-1.0|[0.0,1.0,-1.0]|  [-1.0,1.0,-1.0]| 
+---+----+----+--------------+--------------------+ 
+0

Mükemmel cevap, Bu benim için çok zaman kazandırıyor :) –

+0

Size yardımcı olunMostafaAlaa – Lyle