Oldukça basit bir şey yapmaya çalışıyorum. Veri sayfamın bir parçası olarak bir datetime nesnesi var ve bir harita yaparken, tarihi belirli bir şekilde biçimlendirmek istiyorum. gönderirkenPyspark - lambda içinde arama fonksiyonu içe aktarma hatasına neden oluyor
unique = df.map(lambda x: (x.id,[[format_date(x.t),x.val]]))\
.reduceByKey(lambda x,y: x+y)\
.collectAsMap()
Bu şu istisna neden olur: Ben özel bir işlev oluşturduk:
def format_date(dt):
"""Set this for date formatting. dt is datetime."""
return dt.strftime("%Y/%m/%d %H:%M:%S")
Ve daha sonra da, benim haritası çağrısında kullanabilirsiniz (xt bir datetime nesnedir) benim senaryom adı "run_analyses.py" ve "analysis.py" dan ithal fonksiyonların hepsi
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 9, preteckt1.softlayer.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
ImportError: No module named analysis
Not: bir iş olarak. Çok tuhaf bir şey interaktif pyspark oturumuna kodu kopyalayıp eğer mükemmel çalışıyor olmasıdır
/opt/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --total-executor-cores 12 run_analyses.py
ile işi teslim (veya format_date çağrıyı kaldırırsanız). Yeni bir sütun oluşturarak ve yeni bir sütun oluşturmak için format_date işlevimde UDF kullanarak bunu başarabilirim, ancak bu yaklaşımın neden başarısız olduğunu bilmek isterim.
Aşağıdaki eksiksiz kodun tamamını kopyaladım.
Düzenle: Doğrudan code.ms komutunu çalıştırırsam, ancak run_analysis.py dosyasından çalıştırırsam başarısız olur. Bunu daha doğru göstermek için aşağıdaki kodu değiştirdim.
run_analyses.py
import datetime, json, math, subprocess
from os.path import expanduser
from pyspark import SparkContext
from pyspark.sql import SQLContext, HiveContext
from analysis import *
sc = SparkContext()
sqlCtx = HiveContext(sc)
ids = {}
...
my_func(sqlCtx,ids)
analysis.py
def my_func(sqlCtx,ids):
df = sqlCtx.read.format("org.apache.spark.sql.cassandra").load(table="table_name", keyspace="keyspace_name").select("id","t","val")
df = df.filter((df.t > last_week)&(df.t < now))
df = df.filter(df.val > 0)
write_vals(df)
...
def write_vals(df):
unique = df.map(lambda x: (x.id,[[format_date(x.t),x.val]]))\
.reduceByKey(lambda x,y: x+y)\
.collectAsMap()
...
return
! Teşekkürler :) sc örneğimi sc = SparkContext (pyFiles = ['analysis.py']) olarak değiştirdi. –