Stackoverflow bu soruya beni yönlendiren tutar, bu yüzden:
from pyspark.sql.types import *
schema = ArrayType(StructType([
StructField("char", StringType(), False),
StructField("count", IntegerType(), False)
]))
Örnek kullanım: Eğer çiftleri dizisi dönmek isterseniz, örneğin (tamsayı, string) için böyle şemayı kullanabilirsiniz sanırım buraya biraz bilgi ekleyeceğim.
UDF basit türlerini dönersek:
df = get_df()
func = udf(lambda x: [0]*int(x), ArrayType(IntegerType()))
df = df.withColumn('list', func('y'))
func = udf(lambda x: {float(y): str(y) for y in range(int(x))},
MapType(FloatType(), StringType()))
df = df.withColumn('map', func('y'))
df.show()
# +---+---+--------------------+--------------------+
# | x| y| list| map|
# +---+---+--------------------+--------------------+
# |0.0|0.0| []| Map()|
# |0.0|3.0| [0, 0, 0]|Map(2.0 -> 2, 0.0...|
# |1.0|6.0| [0, 0, 0, 0, 0, 0]|Map(0.0 -> 0, 5.0...|
# |1.0|9.0|[0, 0, 0, 0, 0, 0...|Map(0.0 -> 0, 5.0...|
# +---+---+--------------------+--------------------+
df.printSchema()
# root
# |-- x: double (nullable = true)
# |-- y: double (nullable = true)
# |-- list: array (nullable = true)
# | |-- element: integer (containsNull = true)
# |-- map: map (nullable = true)
# | |-- key: float
# | |-- value: string (valueContainsNull = true)
UDF karmaşık veri türlerini dönersek:
df = get_df()
df = df.groupBy('x').agg(F.collect_list('y').alias('y[]'))
df.show()
# +---+----------+
# | x| y[]|
# +---+----------+
# |0.0|[0.0, 3.0]|
# |1.0|[9.0, 6.0]|
# +---+----------+
schema = StructType([
StructField("min", FloatType(), True),
StructField("size", IntegerType(), True),
StructField("edges", ArrayType(FloatType()), True),
StructField("val_to_index", MapType(FloatType(), IntegerType()), True)
# StructField('insanity', StructType([StructField("min_", FloatType(), True), StructField("size_", IntegerType(), True)]))
])
def func(values):
mn = min(values)
size = len(values)
lst = sorted(values)[::-1]
val_to_index = {x: i for i, x in enumerate(values)}
return (mn, size, lst, val_to_index)
func = udf(func, schema)
dff = df.select('*', func('y[]').alias('complex_type'))
dff.show(10, False)
# +---+----------+------------------------------------------------------+
# |x |y[] |complex_type |
# +---+----------+------------------------------------------------------+
# |0.0|[0.0, 3.0]|[0.0,2,WrappedArray(3.0, 0.0),Map(0.0 -> 0, 3.0 -> 1)]|
# |1.0|[6.0, 9.0]|[6.0,2,WrappedArray(9.0, 6.0),Map(9.0 -> 1, 6.0 -> 0)]|
# +---+----------+------------------------------------------------------+
dff.printSchema()
# +---+----------+------------------------------------------------------+
# |x |y[] |complex_type |
# +---+----------+------------------------------------------------------+
# |0.0|[0.0, 3.0]|[0.0,2,WrappedArray(3.0, 0.0),Map(0.0 -> 0, 3.0 -> 1)]|
# |1.0|[6.0, 9.0]|[6.0,2,WrappedArray(9.0, 6.0),Map(9.0 -> 1, 6.0 -> 0)]|
# +---+----------+------------------------------------------------------+
bir UDF için birden argümanları iletme
from pyspark.sql.types import *
from pyspark.sql import functions as F
def get_df():
d = [(0.0, 0.0), (0.0, 3.0), (1.0, 6.0), (1.0, 9.0)]
df = sqlContext.createDataFrame(d, ['x', 'y'])
return df
df = get_df()
df.show()
# +---+---+
# | x| y|
# +---+---+
# |0.0|0.0|
# |0.0|3.0|
# |1.0|6.0|
# |1.0|9.0|
# +---+---+
func = udf(lambda x: str(x), StringType())
df = df.withColumn('y_str', func('y'))
func = udf(lambda x: int(x), IntegerType())
df = df.withColumn('y_int', func('y'))
df.show()
# +---+---+-----+-----+
# | x| y|y_str|y_int|
# +---+---+-----+-----+
# |0.0|0.0| 0.0| 0|
# |0.0|3.0| 3.0| 3|
# |1.0|6.0| 6.0| 6|
# |1.0|9.0| 9.0| 9|
# +---+---+-----+-----+
df.printSchema()
# root
# |-- x: double (nullable = true)
# |-- y: double (nullable = true)
# |-- y_str: string (nullable = true)
# |-- y_int: integer (nullable = true)
tamsayılar yetmez ise:
df = get_df()
func = udf(lambda arr: arr[0]*arr[1],FloatType())
df = df.withColumn('x*y', func(F.array('x', 'y')))
# +---+---+---+
# | x| y|x*y|
# +---+---+---+
# |0.0|0.0|0.0|
# |0.0|3.0|0.0|
# |1.0|6.0|6.0|
# |1.0|9.0|9.0|
# +---+---+---+
Bu kod yalnızca demo amaçlıdır, yukarıdaki tüm dönüşümler Spark kodunda bulunur ve çok daha iyi bir performans sağlar. Yukarıdaki yorumda @ 03232 gibi, UDF'ler genellikle pysparkta önlenmelidir; Geri dönen karmaşık türler, mantığınızı basitleştirmeyi düşünmenizi sağlamalıdır.
Başlığınız, gövde ile eşleşmiyor gibi görünüyor. Dokümantasyon size bir dönüş değeri nasıl * * "diğer tipte konteyner tipi" * ayarlanacağını söylemiyor mu? – jonrsharpe
@jonrsharpe Başlığı değiştirdim. Umarım şu anda vücudun temsilcisi. – kamalbanga