2017-02-18 52 views
12

Böyle üç sütun değerleri tarafından bölümlenmiş, DataFrameWriter kullanarak Parke formatında HDF'ler bir DataFrame kurtarmaya çalışıyorum:Yeni veriler olmadan bölümleri silmeden Spark'de DataFrame nasıl bölümlenir ve yazılır?

this question belirtildiği gibi
dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path) 

, partitionBypath de bölümleri dolu varolan hiyerarşiyi silecektir ve bunları, dataFrame numaralı bölümlerle değiştirdiler. Belirli bir güne ait yeni artımlı veriler periyodik olarak geleceğinden, istediklerim yalnızca hiyerarşideki bölümleri değiştirmektir, bu nedenle dataFrame'un verileri el değmeden bırakılır.

ben bireysel olarak tam yolunu, böyle bir şey kullanarak her bölüm kaydetmeniz gerekir görünen Bunun için:

singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890") 

Ancak sorun tek bölüm halinde verileri düzenlemek için en iyi yolu anlama yaşıyorum DataFrame s, böylece onları tam yollarını kullanarak yazabilirim.

dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ... 

Ama foreachPartition Parke biçimine dışarı yazma için ideal değildir bir Iterator[Row] çalışır: Bir fikir bir şey gibiydi.

Ayrıca bölümlerin listesini almak ve daha sonra bu bölümlerin her biri tarafından özgün veri çerçevesini filtrelemek ve sonuçları tam bölümlenmiş yollarına kaydetmek için bir select...distinct eventdate, hour, processtime kullanmayı düşündüm. Ancak, her bir bölüm için ayrı bir sorgu ve bir filtre, çok fazla filtre/yazma işlemi olacağından çok verimli görünmüyor.

dataFrame'un veri içermediği mevcut bölümleri korumanın daha temiz bir yolu var mıdır?

Okumak için teşekkürler.

Spark sürümü: 2.1

cevap

0

Ek olarak modu deneyebilirsiniz.

dataFrame.write.format("parquet") 
.mode("append") 
.partitionBy("year","month") 
.option("path",s"$path/table_name") 
.saveAsTable(s"stg_table_name") 
1

Mod seçeneği Append'da bir yakalama var!

df.write.partitionBy("y","m","d") 
.mode(SaveMode.Append) 
.parquet("/data/hive/warehouse/mydbname.db/" + tableName) 

Test ettim ve bunun mevcut bölüm dosyalarını saklayacağını gördüm. Ancak, bu sefer sorun şu: Eğer aynı kodu iki kez (aynı verilerle) çalıştırıyorsanız, aynı veriler için mevcut olanları değiştirmek yerine yeni parke dosyaları yaratacaktır (Spark 1.6). Yani, Append kullanmak yerine, bu sorunu yine de Overwrite ile çözebiliriz. Tablo seviyesinde üzerine yazmak yerine, bölüm seviyesinde üzerine yazmamız gerekir.

df.write.mode(SaveMode.Overwrite) 
.parquet("/data/hive/warehouse/mydbname.db/" + tableName + "/y=" + year + "/m=" + month + "/d=" + day) 

fazla bilgi için aşağıdaki linke bakınız:

Overwrite specific partitions in spark dataframe write method

(ı suriyanto yorumuna sonra benim cevap güncelledik Thnx..) Aynı yazarken eğer

+0

sen test ettin Eski bölümün yerini iki kez veri? Testimden, aslında bölüm dizininde verilerin ikiye katlanmasına neden olan yeni bir parke dosyası oluşturuyor. Kıvılcım 2.2'deyim. – suriyanto