6

İki veri çerçevesi arasında dış birleştirmeyi yaptığım bir kıvılcım var. İlk veri çerçevesinin boyutu 260 GB'dir, dosya formatı 2200 dosyaya bölünmüş metin dosyalarıdır ve ikinci veri çerçevesinin boyutu 2GB'dir.EMR'de kıvılcım işini hızlıca yazmak için nasıl yapılır S3

Bu iki dosyayı veri çerçevesine yüklemek 10 dakika sürer.

Daha sonra S3'e yaklaşık 260 GB olan veri çerçeve çıktısının yazılması yaklaşık 1 saat sürer.

İşte küme bilgilerim. Burada

emr-5.9.0 
Master:1m3.2xlarge 
Core:c3.4large 5 machines 

Bu benim kod ben burada

[ 
    { 
     "Classification": "spark-defaults" 
     , "Properties": { 
      "spark.serializer": "org.apache.spark.serializer.KryoSerializer" 
     } 
    },{ 
    "Classification": "emrfs-site", 
    "Properties": { 
     "fs.s3.maxConnections": "200" 
    } 
    } 
] 

kuruyorum benim küme yapılandırma olduğunu

CPU:16 
RAM:30 
DISK:2 × 160 GB SSD 

Zeppelin 0.7.2, Spark 2.2.0, Ganglia 3.7.2 

her c3.4xlarge makinesinin detay

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    import sqlContext.implicits._ 

    import org.apache.spark.{ SparkConf, SparkContext } 
    import java.sql.{Date, Timestamp} 
    import org.apache.spark.sql.Row 
    import org.apache.spark.sql.types._ 
    import org.apache.spark.sql.functions.udf 
    import java.io.File 
    import org.apache.hadoop.fs._ 



import org.apache.spark.sql.functions.input_file_name 
import org.apache.spark.sql.functions.regexp_extract 


val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3)) 
val get_cus_valYear = spark.udf.register("get_cus_valYear", (filePath: String) => filePath.split("\\.")(4)) 


val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load(("s3://trfsdisu/SPARK/FundamentalAnalytic/MAIN")) 

val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*) 
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq 
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*) 

val df1resultFinal=df1result.withColumn("DataPartition", get_cus_val(input_file_name)) 
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_valYear(input_file_name)) 


val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FundamentalAnalytic/INCR") 
val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*) 
val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c42"))).toSeq 
val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*) 

import org.apache.spark.sql.expressions._ 
val windowSpec = Window.partitionBy("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId").orderBy($"TimeStamp".cast(LongType).desc) 
val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp") 


val dfMainOutput = df1resultFinalWithYear.join(latestForEachKey, Seq("FundamentalSeriesId", "financialPeriodEndDate","financialPeriodType","sYearToDate","lineItemId"), "outer") 
     .select($"FundamentalSeriesId", 
     when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition".cast(DataTypes.StringType)).as("DataPartition"), 
     when($"PartitionYear_1".isNotNull, $"PartitionYear_1").otherwise($"PartitionYear".cast(DataTypes.StringType)).as("PartitionYear"), 
     when($"FundamentalSeriesId_objectTypeId_1".isNotNull, $"FundamentalSeriesId_objectTypeId_1").otherwise($"FundamentalSeriesId_objectTypeId".cast(DataTypes.StringType)).as("FundamentalSeriesId_objectTypeId"), 
     when($"analyticItemInstanceKey_1".isNotNull, $"analyticItemInstanceKey_1").otherwise($"analyticItemInstanceKey").as("analyticItemInstanceKey"), 
     when($"AnalyticValue_1".isNotNull, $"AnalyticValue_1").otherwise($"AnalyticValue").as("AnalyticValue"), 
     when($"AnalyticConceptCode_1".isNotNull, $"AnalyticConceptCode_1").otherwise($"AnalyticConceptCode").as("AnalyticConceptCode"), 
     $"AuditID_1").otherwise($"AuditID").as("AuditID"), 
     when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"), 
     when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction")) 
     .filter(!$"FFAction".contains("D")) 



    val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated")) 

    val dfMainOutputWithoutFinalYear = dfMainOutputFinal.select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutputFinal.schema.fieldNames.filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated")) 

    val headerColumn = df.columns.filter(v => (!v.contains("^") && !v.contains("_c42"))).toSeq 

    val header = headerColumn.dropRight(1).mkString("", "|^|", "|!|") 

    val dfMainOutputFinalWithoutNull = dfMainOutputWithoutFinalYear.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header) 
    dfMainOutputFinalWithoutNull.write.partitionBy("DataPartition","PartitionYear") 
     .format("csv") 
     .option("nullValue", "") 
     .option("header", "true") 
     .option("codec", "gzip") 
     .save("s3://trfsdisu/SPARK/FundamentalAnalytic/output") 

I Buna ek olarak, önce HDFS'ye veri yazmayı denedim, ancak aynı zamanda HDFS dizinine yazmak için de aynı zaman alıyor (S3'ten 4 dakika daha az).

enter image description here

Tüm Sahne DAG enter image description here

DAG

ekleme SQL Fiziksel planını ekleme

enter image description here

Bellek İşimin son saat kullanılmış

012 İşte

enter image description here

bazı iş günlükleri

I looking into the job execution on the below clusters .Here were some on my observations : 

Majority time is being consumed at RDD getting spilling in the disk . 

######### 

17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:37 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:41:38 INFO UnsafeExternalSorter: Thread 91 spilling sort data of 704.0 MB to disk (0 time so far) 
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 88 spilling sort data of 704.0 MB to disk (1 time so far) 
17/10/17 15:42:17 INFO UnsafeExternalSorter: Thread 90 spilling sort data of 704.0 MB to disk (1 time so far) 
17/10/17 15:42:33 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 704.0 MB to disk (1 time so far) 

######### 

This is causing spilling of 700MB memory pages constantly on the disk and then reading it back before shuffle phase . The same is see in all the containers ran for the job . The reason why lot of spilling is happening is because the executor are launched in a container with size : 

######################### 
    17/10/17 15:20:18 INFO YarnAllocator: Will request 1 executor container(s), each with 4 core(s) and 5632 MB memory (including 512 MB of overhead) 
######################### 

Which means each containers are on 5GB and hence they are getting full very quickly .and because of memory pressure they are getting spilled . 

You will notice the same in the nodemanager Logs : 

2017-10-17 15:58:21,590 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 7759 for container-id container_1508253273035_0001_01_000003: 5.0 GB of 5.5 GB physical memory used; 8.6 GB of 27.5 GB virtual memory used 
+0

Web UI'den mantıksal planları gösterin. Bana/insanlara kodun ne yaptığına dair fikir vermek için ekran görüntüleri yapın. Teşekkürler. –

+0

Spark'in web kullanıcı arayüzünden gelen sorgular için fiziksel planlar sordum. Bu, sorgularınızın tam olarak ne yaptığını bilmeme yardımcı olmalı. –

+0

@JacekLaskowski Ah alright .. Gangelia'dakileri görebilir miyim? – SUDARSHAN

cevap

1

Sen RAM her 30 GB olan, beş c3.4large EC2 örneklerini çalıştıran olduğunu. Yani, toplamda sadece 150GB,> 200GB veri kümenizden çok daha küçüktür. Bu nedenle çok miktarda disk dökülmesi. Belki bunun yerine r tipi EC2 örneklerini (bellek için optimize edilmiş hesaplamanın türü olarak optimize edilmiş bellek) çalıştırabilir ve performans iyileştirme olup olmadığını görebilirsiniz.

+0

r4.4xlarge kullanmasam bile, hemen hemen aynı zaman alır ...Kullanılan V çekirdek hala 1 – SUDARSHAN

+1

@SUDARSHAN bu özelliği küme yapılandırmanıza ekleyin: [{"classification": "spark", "properties": {"maximizeResourceAllocation": "true"}}] ' – Will