2015-10-21 11 views
7

Spark Scala'yı kullanarak bir CSV dosyasındaki tüm sütunların Histogramını hesaplamaya çalışıyorum.Büyük bir CSV/RDD'deki [Array [çift]] Apache Spark Scala kullanarak tüm sütunların histogramı nasıl alınır?

Histogram'ı destekleyen DoubleRDDFunctions'ı buldum. Tüm sütunların histogramını almak için takip ettiğim gibi kodlanmıştım.

  1. alın kolon
  2. DoubleRDDFunctions

    var columnIndexArray = Array.tabulate(rdd.first().length) (_ * 1) 
    
    val histogramData = columnIndexArray.map(columns => { 
        rdd.map(lines => lines(columns)).histogram(6) 
    }) 
    

iyi bir yolu mu kullanarak her RDD ait Histogram'ı her sütunun RDD[double] oluşturun ve hesaplamak saymak? Bunu halletmek için daha iyi yollar önerebilecek biri var mı?

Şimdiden teşekkürler.

cevap

5

Tam olarak daha iyi, ama alternatif bir yoludur olacak RDD'yi bir DataFrame'e dönüştürmek ve histogram_numeric UDF'yi kullanmaktır.

Örnek veriler:

import scala.util.Random 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.functions.{callUDF, lit, col} 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.hive.HiveContext 

val sqlContext = new HiveContext(sc) 

Random.setSeed(1) 

val ncol = 5 

val rdd = sc.parallelize((1 to 1000).map(
    _ => Row.fromSeq(Array.fill(ncol)(Random.nextDouble)) 
)) 

val schema = StructType(
    (1 to ncol).map(i => StructField(s"x$i", DoubleType, false))) 

val df = sqlContext.createDataFrame(rdd, schema) 
df.registerTempTable("df") 

Sorgu: öneri

val nBuckets = 3 
val columns = df.columns.map(
    c => callUDF("histogram_numeric", col(c), lit(nBuckets)).alias(c)) 
val histograms = df.select(columns: _*) 

histograms.printSchema 

// root 
// |-- x1: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 
// |-- x2: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 
// |-- x3: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 
// |-- x4: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 
// |-- x5: array (nullable = true) 
// | |-- element: struct (containsNull = true) 
// | | |-- x: double (nullable = true) 
// | | |-- y: double (nullable = true) 

histograms.select($"x1").collect() 

// Array([WrappedArray([0.16874313309969038,334.0], 
// [0.513382068667877,345.0], [0.8421388886903808,321.0])]) 
+1

Verme org.apache.spark.sql.AnalysisException: tanımlanmamış işlev histogram_numeric. Ben kıvılcım 1.5.1 kullanıyorum –

+0

UDFs HiveContext gerektirir. – zero323

+0

teşekkürler ... Cevabınızda değişken adını düzenledim. –

1

(scala API) dönüşümü, countByValue istediğini yapması gerektiğini

böylece örneği sizin RDD içinde ilk sütuna için histogram verilerini oluşturmak için: içinde

val histCol1 = RDD.map(record => record.col_1).countByValue() 

Yukarıdaki ifade, kaydı yalnızca RDD'deki bir veri satırına başvuruyor, bu, bir alan col_1

ve bu histCol1 bir karma tablo (Scala Harita) olan anahtarlar Kolon 1 (col_1) içinde benzersiz değerlerdir dönmek ve değerler açıkça benzersiz her değer frekansları

+0

teşekkür ederiz. Ama aynı zamanda kova boyutu vermem gerekiyor.Maksimum kovalar 10. countByValue() çift RDD histogramından daha verimli çalışacak mı? –

+0

"kepçe boyutu" aslında countByValue tarafından döndürüldü - her değer kepçenin boyutu, kepçenin adı – doug

+0

ise kepçe boyutunu bir değere sabitleyebilir miyiz? farklı saymayı düşünmek yerine. Tüm farklı sayılara ihtiyacım yok, histograma maksimum 10 kepçeyle ihtiyacım var. –