2016-04-25 18 views
7

tüm data types in pyspark.sql.types are:PySpark'ta bir UDF'de "Tuple tipi" nasıl geri dönülür?

__all__ = [ 
    "DataType", "NullType", "StringType", "BinaryType", "BooleanType", "DateType", 
    "TimestampType", "DecimalType", "DoubleType", "FloatType", "ByteType", "IntegerType", 
    "LongType", "ShortType", "ArrayType", "MapType", "StructField", "StructType"] 

I dizilerini bir dizi döndürür (pyspark olarak) bir UDF yazmak zorunda. Udf yönteminin dönüş türü olan ikinci argümana ne vereceğim? Spark'de ...

+0

Başlığınız, gövde ile eşleşmiyor gibi görünüyor. Dokümantasyon size bir dönüş değeri nasıl * * "diğer tipte konteyner tipi" * ayarlanacağını söylemiyor mu? – jonrsharpe

+0

@jonrsharpe Başlığı değiştirdim. Umarım şu anda vücudun temsilcisi. – kamalbanga

cevap

11

satırlarında bir şey olurdu TupleType diye bir şey yoktur. Ürün türleri, belirli tipte alanlarla structs olarak temsil edilir.

from pyspark.sql.functions import udf 
from collections import Counter 

char_count_udf = udf(
    lambda s: Counter(s).most_common(), 
    schema 
) 

df = sc.parallelize([(1, "foo"), (2, "bar")]).toDF(["id", "value"]) 

df.select("*", char_count_udf(df["value"])).show(2, False) 

## +---+-----+-------------------------+ 
## |id |value|PythonUDF#<lambda>(value)| 
## +---+-----+-------------------------+ 
## |1 |foo |[[o,2], [f,1]]   | 
## |2 |bar |[[r,1], [a,1], [b,1]] | 
## +---+-----+-------------------------+ 
+0

Cevabınız çalışıyor, fakat benim durumum biraz karmaşık. İade verilerim, [('a1', [('b1', 1), ('b2', 2)]), ('a2', [('b1', 1), ('b2) türündedir ', 2)])] 've bu yüzden ben ArrayType (StructType (" StructField ("tarih", StringType(), False), ArrayType (StructType ([StructField ("hashId", StringType(), Yanlış) olarak bir tür yapıyorum), StructField ("TimeSpent-Front", FloatType(), False), StructField ("TimeSpent-Back", FloatType(), False)]))])) '** 'ArrayType' nesnesinin herhangi bir özelliği yoktur ' '** ... – kamalbanga

+1

'StructType' ifadesi' StructFields' dizisini gerektirir, dolayısıyla' ArrayTypes 'özelliğini tek başına kullanamazsınız. ArrayType'ı depolayan 'StructField''a ihtiyacınız var. Ayrıca tavsiye kelimesi - kendinizi bu gibi yapılar oluştururken bulursanız muhtemelen veri modelini yeniden düşünmelisiniz. Derinlemesine yuvalanmış yapıların UDF'ler olmadan işlenmesi zor ve Python UDF'leri verimli olmaktan uzaktır. – zero323

+0

Bir listeyi döndürmek için udf'deki şemayı nasıl belirleyebilirim. F.udf (lambda start_date, end_date: [0,1] eğer başlangıç_tarihi pseudocode

4

Stackoverflow bu soruya beni yönlendiren tutar, bu yüzden:

from pyspark.sql.types import * 

schema = ArrayType(StructType([ 
    StructField("char", StringType(), False), 
    StructField("count", IntegerType(), False) 
])) 

Örnek kullanım: Eğer çiftleri dizisi dönmek isterseniz, örneğin (tamsayı, string) için böyle şemayı kullanabilirsiniz sanırım buraya biraz bilgi ekleyeceğim.

UDF basit türlerini dönersek:

df = get_df() 

func = udf(lambda x: [0]*int(x), ArrayType(IntegerType())) 
df = df.withColumn('list', func('y')) 

func = udf(lambda x: {float(y): str(y) for y in range(int(x))}, 
      MapType(FloatType(), StringType())) 
df = df.withColumn('map', func('y')) 

df.show() 
# +---+---+--------------------+--------------------+ 
# | x| y|    list|     map| 
# +---+---+--------------------+--------------------+ 
# |0.0|0.0|     []|    Map()| 
# |0.0|3.0|   [0, 0, 0]|Map(2.0 -> 2, 0.0...| 
# |1.0|6.0| [0, 0, 0, 0, 0, 0]|Map(0.0 -> 0, 5.0...| 
# |1.0|9.0|[0, 0, 0, 0, 0, 0...|Map(0.0 -> 0, 5.0...| 
# +---+---+--------------------+--------------------+ 

df.printSchema() 
# root 
# |-- x: double (nullable = true) 
# |-- y: double (nullable = true) 
# |-- list: array (nullable = true) 
# | |-- element: integer (containsNull = true) 
# |-- map: map (nullable = true) 
# | |-- key: float 
# | |-- value: string (valueContainsNull = true) 

UDF karmaşık veri türlerini dönersek:

df = get_df() 
df = df.groupBy('x').agg(F.collect_list('y').alias('y[]')) 
df.show() 

# +---+----------+ 
# | x|  y[]| 
# +---+----------+ 
# |0.0|[0.0, 3.0]| 
# |1.0|[9.0, 6.0]| 
# +---+----------+ 

schema = StructType([ 
    StructField("min", FloatType(), True), 
    StructField("size", IntegerType(), True), 
    StructField("edges", ArrayType(FloatType()), True), 
    StructField("val_to_index", MapType(FloatType(), IntegerType()), True) 
    # StructField('insanity', StructType([StructField("min_", FloatType(), True), StructField("size_", IntegerType(), True)])) 

]) 

def func(values): 
    mn = min(values) 
    size = len(values) 
    lst = sorted(values)[::-1] 
    val_to_index = {x: i for i, x in enumerate(values)} 
    return (mn, size, lst, val_to_index) 

func = udf(func, schema) 
dff = df.select('*', func('y[]').alias('complex_type')) 
dff.show(10, False) 

# +---+----------+------------------------------------------------------+ 
# |x |y[]  |complex_type           | 
# +---+----------+------------------------------------------------------+ 
# |0.0|[0.0, 3.0]|[0.0,2,WrappedArray(3.0, 0.0),Map(0.0 -> 0, 3.0 -> 1)]| 
# |1.0|[6.0, 9.0]|[6.0,2,WrappedArray(9.0, 6.0),Map(9.0 -> 1, 6.0 -> 0)]| 
# +---+----------+------------------------------------------------------+ 

dff.printSchema() 

# +---+----------+------------------------------------------------------+ 
# |x |y[]  |complex_type           | 
# +---+----------+------------------------------------------------------+ 
# |0.0|[0.0, 3.0]|[0.0,2,WrappedArray(3.0, 0.0),Map(0.0 -> 0, 3.0 -> 1)]| 
# |1.0|[6.0, 9.0]|[6.0,2,WrappedArray(9.0, 6.0),Map(9.0 -> 1, 6.0 -> 0)]| 
# +---+----------+------------------------------------------------------+ 

bir UDF için birden argümanları iletme

from pyspark.sql.types import * 
from pyspark.sql import functions as F 

def get_df(): 
    d = [(0.0, 0.0), (0.0, 3.0), (1.0, 6.0), (1.0, 9.0)] 
    df = sqlContext.createDataFrame(d, ['x', 'y']) 
    return df 

df = get_df() 
df.show() 

# +---+---+ 
# | x| y| 
# +---+---+ 
# |0.0|0.0| 
# |0.0|3.0| 
# |1.0|6.0| 
# |1.0|9.0| 
# +---+---+ 

func = udf(lambda x: str(x), StringType()) 
df = df.withColumn('y_str', func('y')) 

func = udf(lambda x: int(x), IntegerType()) 
df = df.withColumn('y_int', func('y')) 

df.show() 

# +---+---+-----+-----+ 
# | x| y|y_str|y_int| 
# +---+---+-----+-----+ 
# |0.0|0.0| 0.0| 0| 
# |0.0|3.0| 3.0| 3| 
# |1.0|6.0| 6.0| 6| 
# |1.0|9.0| 9.0| 9| 
# +---+---+-----+-----+ 

df.printSchema() 

# root 
# |-- x: double (nullable = true) 
# |-- y: double (nullable = true) 
# |-- y_str: string (nullable = true) 
# |-- y_int: integer (nullable = true) 

tamsayılar yetmez ise:

df = get_df() 
func = udf(lambda arr: arr[0]*arr[1],FloatType()) 
df = df.withColumn('x*y', func(F.array('x', 'y'))) 

    # +---+---+---+ 
    # | x| y|x*y| 
    # +---+---+---+ 
    # |0.0|0.0|0.0| 
    # |0.0|3.0|0.0| 
    # |1.0|6.0|6.0| 
    # |1.0|9.0|9.0| 
    # +---+---+---+ 

Bu kod yalnızca demo amaçlıdır, yukarıdaki tüm dönüşümler Spark kodunda bulunur ve çok daha iyi bir performans sağlar. Yukarıdaki yorumda @ 03232 gibi, UDF'ler genellikle pysparkta önlenmelidir; Geri dönen karmaşık türler, mantığınızı basitleştirmeyi düşünmenizi sağlamalıdır.