2016-04-07 11 views
1

Oldukça basit bir şey yapmaya çalışıyorum. Veri sayfamın bir parçası olarak bir datetime nesnesi var ve bir harita yaparken, tarihi belirli bir şekilde biçimlendirmek istiyorum. gönderirkenPyspark - lambda içinde arama fonksiyonu içe aktarma hatasına neden oluyor

unique = df.map(lambda x: (x.id,[[format_date(x.t),x.val]]))\ 
     .reduceByKey(lambda x,y: x+y)\ 
     .collectAsMap() 

Bu şu istisna neden olur: Ben özel bir işlev oluşturduk:

def format_date(dt): 
    """Set this for date formatting. dt is datetime.""" 
    return dt.strftime("%Y/%m/%d %H:%M:%S") 

Ve daha sonra da, benim haritası çağrısında kullanabilirsiniz (xt bir datetime nesnedir) benim senaryom adı "run_analyses.py" ve "analysis.py" dan ithal fonksiyonların hepsi

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 9, preteckt1.softlayer.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main 
    command = pickleSer._read_with_length(infile) 
    File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length 
    return self.loads(obj) 
    File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads 
    return pickle.loads(obj) 
ImportError: No module named analysis 

Not: bir iş olarak. Çok tuhaf bir şey interaktif pyspark oturumuna kodu kopyalayıp eğer mükemmel çalışıyor olmasıdır

/opt/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --total-executor-cores 12 run_analyses.py 

ile işi teslim (veya format_date çağrıyı kaldırırsanız). Yeni bir sütun oluşturarak ve yeni bir sütun oluşturmak için format_date işlevimde UDF kullanarak bunu başarabilirim, ancak bu yaklaşımın neden başarısız olduğunu bilmek isterim.

Aşağıdaki eksiksiz kodun tamamını kopyaladım.

Düzenle: Doğrudan code.ms komutunu çalıştırırsam, ancak run_analysis.py dosyasından çalıştırırsam başarısız olur. Bunu daha doğru göstermek için aşağıdaki kodu değiştirdim.

run_analyses.py

import datetime, json, math, subprocess 
from os.path import expanduser 
from pyspark import SparkContext 
from pyspark.sql import SQLContext, HiveContext 
from analysis import * 

sc = SparkContext() 
sqlCtx = HiveContext(sc) 
ids = {} 
... 
my_func(sqlCtx,ids) 

analysis.py

def my_func(sqlCtx,ids): 
    df = sqlCtx.read.format("org.apache.spark.sql.cassandra").load(table="table_name", keyspace="keyspace_name").select("id","t","val") 
    df = df.filter((df.t > last_week)&(df.t < now)) 
    df = df.filter(df.val > 0) 
    write_vals(df) 
    ... 

def write_vals(df): 
    unique = df.map(lambda x: (x.id,[[format_date(x.t),x.val]]))\ 
      .reduceByKey(lambda x,y: x+y)\ 
      .collectAsMap() 
    ... 
    return 

cevap

2

anahtar traceback içinde:

ImportError: No module named analysis 

PySpark çalışan işlemi yok olduğunu anlatıyor analysis.py dosyasına erişim. Eğer SparkContext başlatmak zaman işçiye kopyalanması gerekir dosyaların bir listesini geçirebilirsiniz:

sc = SparkContext("local", "App Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) 

fazla bilgi: did it https://spark.apache.org/docs/0.9.0/python-programming-guide.html#standalone-use

+0

! Teşekkürler :) sc örneğimi sc = SparkContext (pyFiles = ['analysis.py']) olarak değiştirdi. –