2016-04-02 8 views
5

ABD iç hat uçuşlarının 2015 yılına ait performans kayıtlarını analiz ediyorum. Kuyruk numarasına göre gruplandırmalıyım ve her bir kuyruk numarasına ait tüm uçuşların bir tarih sıralı listesini bir veritabanında, başvurum tarafından alınacak şekilde depolamalıyım. Bunu başarmak için hangi iki seçeneğin en iyisi olduğundan emin değilim.PySpark'ta sıralanmış bir küçülme yapmanın en etkili yolu nedir?

# Load the parquet file 
on_time_dataframe = sqlContext.read.parquet('../data/on_time_performance.parquet') 

# Filter down to the fields we need to identify and link to a flight 
flights = on_time_dataframe.rdd.map(lambda x: 
    (x.Carrier, x.FlightDate, x.FlightNum, x.Origin, x.Dest, x.TailNum) 
) 

azaltmak bu Doing

# Do same in a map step, more efficient or does pySpark know how to optimize the above? 
flights_per_airplane = flights\ 
    .map(lambda nameTuple: (nameTuple[5], [nameTuple]))\ 
    .reduceByKey(lambda a, b: a + b)\ 
    .map(lambda tuple: 
    (
     tuple[0], sorted(tuple[1], key=lambda x: (x[1],x[2],x[3],x[4]))) 
    ) 

... Ben ... bir çeşit azaltmak içinde

# Group flights by tail number, sorted by date, then flight number, then 
origin/dest 
flights_per_airplane = flights\ 
    .map(lambda nameTuple: (nameTuple[5], [nameTuple]))\ 
    .reduceByKey(lambda a, b: sorted(a + b, key=lambda x: (x[1],x[2],x[3],x[4]))) 

bunu başarabilirsiniz Ya bir sonraki harita işinde bunu elde edebilirsiniz Gerçekten verimsiz görünüyor, ama aslında her ikisi de çok yavaş. sıralanmış() PySpark belgelerinde bunu yapma şekline benziyor, bu yüzden PySpark bu koşucuyu dahili olarak yapmıyor mu? Başka bir nedenden hangisi en verimli veya en iyi seçenek?

Kodum burada ve özü de şudur: verilerle merak ediyorsanız https://gist.github.com/rjurney/af27f70c76dc6c6ae05c465271331ade

, burada Ulaştırma İstatistikleri Bürosu, dan: http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time

cevap

3

Maalesef her iki yönde senden önce yanlış sıralama yapmaya bile başlıyoruz ve bunu Spark'da yapmanın etkili ve basit bir yolu yoktur. Yine de, ilki diğerinden çok daha kötüdür.

Neden her iki yöntem yanlış? Çünkü bu sadece bir başka groupByKey ve sadece pahalı bir işlem. Bazı şeyleri iyileştirmeye çalışmanın bazı yolları vardır (özellikle harita tarafındaki düşüşten kaçınmak için), ancak günün sonunda tam bir karıştırmanın bedelini ödemek zorundasınız ve herhangi bir başarısızlık görmüyorsanız muhtemelen değmez. tüm yaygara Yine de, ikinci yaklaşım algoritmik olarak çok daha iyidir *. İlk denemede olduğu gibi sıralı bir yapıya sahip olmak istiyorsanız, araçlara (bisect.insort ile aggregateByKey iyi bir seçim olacaktır) sahip olmalısınız, ancak burada kazanılacak hiçbir şey yoktur.

Gruplandırılmış çıktı zor bir gereksinimse, yapabileceğiniz en iyi şey keyBy, groupByKey ve sort. İkinci çözüm üzerinde performansını artırmak olmaz ama belki okunabilirliği artıracaktır:

(flights 
    .keyBy(lambda x: x[5]) 
    .groupByKey() 
    .mapValues(lambda vs: sorted(vs, key=lambda x: x[1:5]))) 
Eğer Timsort birinci yaklaşım için iyi senaryo N kez O (K) süre olduğunu varsaysak bile

* ikincisi, en kötü durum senaryosunda O (N log N)'dur.

+0

Sadece bu soruya doğru bir cevap beklemediğimi söylemek istedim ve gerçekten verdiğiniz şey harika! – rjurney