2016-04-22 11 views
5

pyspark'ta bir listeden sözlük oluşturmaya çalışıyorum.İstisna nedir? String hasarımın rastlantısallığı pyspark'ta PYTHONHASHSEED ortalama ile devre dışı bırakılmalıdır?

rawPositions 

[[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3904.125, 390412.5], 
[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3900.75, 390075.0], 
[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3882.5625, 388256.25], 
[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3926.25, 392625.0], 
[2766232, 
    'CDX IG CDSI S25 V1 5Y CBBT CORP', 
    'BC85', 
    'Enterprise', 
    30000000.0, 
    -16323.2439825, 
    30000000.0], 
[2766232, 
    'CDX IG CDSI S25 V1 5Y CBBT CORP', 
    'BC85', 
    'Enterprise', 
    30000000.0, 
    -16928.620101900004, 
    30000000.0], 
[1009804, 'LPM6 Comdty', 'BC29', 'Jet', 105.0, 129596.25, 12959625.0], 
[1009804, 'LPM6 Comdty', 'BC29', 'Jet', 128.0, 162112.0, 16211200.0], 
[1009804, 'LPM6 Comdty', 'BC29', 'Jet', 135.0, 167146.875, 16714687.5], 
[1009804, 'LPM6 Comdty', 'BC29', 'Jet', 109.0, 132884.625, 13288462.5]] 

Sonra

i = sc.parallelize(rawPositions) 
#i.collect() 

Sonra bir sözlükle çevirmek için denemek listesine parallelize benim sparkcontext değişken sc kullanarak verir: Ben listelerinin aşağıdaki listesi var her liste girişinin 3. öğesi üzerinde bir groupby fonksiyonu kullanarak.

j = i.groupBy(lambda x: x[3]) 
j.collect() 

verir

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-143-6113a75f0a9e> in <module>() 
     2 #i.collect() 
     3 j = i.groupBy(lambda x: x[3]) 
----> 4 j.collect() 

/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.py in collect(self) 
    769   """ 
    770   with SCCallSiteSync(self.context) as css: 
--> 771    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    772   return list(_load_from_socket(port, self._jrdd_deserializer)) 
    773 

/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    811   answer = self.gateway_client.send_command(command) 
    812   return_value = get_return_value(
--> 813    answer, self.gateway_client, self.target_id, self.name) 
    814 
    815   for temp_arg in temp_args: 

/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/utils.py in deco(*a, **kw) 
    43  def deco(*a, **kw): 
    44   try: 
---> 45    return f(*a, **kw) 
    46   except py4j.protocol.Py4JJavaError as e: 
    47    s = e.java_exception.toString() 

/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    306     raise Py4JJavaError(
    307      "An error occurred while calling {0}{1}{2}.\n". 
--> 308      format(target_id, ".", name), value) 
    309    else: 
    310     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 50.0 failed 4 times, most recent failure: Lost task 14.3 in stage 50.0 (TID 7583, brllxhtce01.bluecrest.local): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream 
    for obj in iterator: 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.py", line 1703, in add_shuffle_key 
    buckets[partitionFunc(k) % numPartitions].append((k, v)) 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py", line 74, in portable_hash 
    raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED") 
Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:926) 
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405) 
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) 
    at sun.reflect.GeneratedMethodAccessor31.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 133, in dump_stream 
    for obj in iterator: 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/pyspark/rdd.py", line 1703, in add_shuffle_key 
    buckets[partitionFunc(k) % numPartitions].append((k, v)) 
    File "/net/nas/uxhome/condor_ldrt-s/spark-1.6.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/rdd.py", line 74, in portable_hash 
    raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED") 
Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    ... 1 more 

Bu hata herhangi bir yardım çok iyi olurdu için ... atıfta hiçbir fikrim yok!

Python str, byte ve datetime nesnelerin Python'ın 3.2.3+ karma reddi hizmet belirli bir türdeki saldırıların önlemek için rasgele değer kullanılarak tuzlu olduğundan

cevap

13

. Bu, hash değerlerinin tek tercüman oturumunda tutarlı olduğu, ancak oturumdan oturuma farklılık gösterdiği anlamına gelir. PYTHONHASHSEED, oturum arasında tutarlı bir değer sağlamak için RNG tohumunu ayarlar.

Bunu kabuğunuzda kolayca kontrol edebilirsiniz.

unset PYTHONHASHSEED 
for i in `seq 1 3`; 
    do 
    python3 -c "print(hash('foo'))"; 
    done 

## -7298483006336914254 
## -6081529125171670673 
## -3642265530762908581 

ama ayarlandığında her yürütme aynı değeri elde edersiniz: PYTHONHASHSEED ayarlanmazsa bazı rasgele değerler alırsınız bağlı groupBy yana

export PYTHONHASHSEED=323 
for i in `seq 1 3`; 
    do 
    python3 -c "print(hash('foo'))"; 
    done 

## 8902216175227028661 
## 8902216175227028661 
## 8902216175227028661 

ve diğer işlemleri varsayılan bölümleyici, tutarlı sonuçlar elde etmek için PYTHONHASHSEED'un kümedeki tüm makinelerde aynı değeri kullanmasına ihtiyaç duyar.

Ayrıca bakınız: Kıvılcım Yapılandırma https://spark.apache.org/docs/latest/configuration.html#loading-default-configurations Runtime Environment bölüm içinde

+0

teşekkür ederiz. Bu düzeltildi! – ThatDataGuy

+0

Vay, bu gerçekten "Kıvılcım ile başlamak" öğretici olmalıdır. Spark, varsayılan olarak Python 2.7'de çalıştığı için değil. Teşekkürler, beni kurtardın. – sudo

2

kontrol edin.

çalıştıran:

$SPARK_HOME/bin/spark-submit 

Ekleme:

--conf spark.executorEnv.PYTHONHASHSEED=321 
+0

Harika, bahşiş için teşekkürler! –