2017-10-09 95 views
8

Mongo Spark bağlayıcısını bir proje için değerlendiriyorum ve tutarsız sonuçları alıyorum. MongoDB sunucu sürümü 3.4.5, Spark (PySpark üzerinden) sürüm 2.2.0, Mongo Spark Connector sürüm 2.11; 2.2.0 yerel olarak dizüstü bilgisayarımda kullanıyorum. Benim test DB için Enron veri kümesini kullanıyorum http://mongodb-enron-email.s3-website-us-east-1.amazonaws.com/ Spark SQL sorguları ile ilgileniyorum ve sayım için basit test sorgularını çalıştırmaya başladığımda her çalışma için farklı sayımlar aldım.Neden Mongo Spark bağlayıcı bir sorgu için farklı ve yanlış sayımlar döndürür?

In [1]: df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/enron_mail.messages").load() 
In [2]: df.registerTempTable("messages") 
In [3]: res = spark.sql("select count(*) from messages where headers.To='[email protected]'") 
In [4]: res.show() 
+--------+                  
|count(1)| 
+--------+ 
|  162| 
+--------+ 
In [5]: res.show() 
+--------+                  
|count(1)| 
+--------+ 
|  160| 
+--------+ 
In [6]: res = spark.sql("select count(_id) from messages where headers.To='[email protected]'") 
In [7]: res.show() 
+----------+                  
|count(_id)| 
+----------+ 
|  161| 
+----------+ 
In [8]: res.show() 
+----------+                  
|count(_id)| 
+----------+ 
|  162| 
+----------+ 

Ben bu konuda Google arandı ama yararlı bir şey bulamadık: İşte benim PySpark kabuğundan bazı çıkıştır

> db.messages.count({'headers.To': '[email protected]'}) 
203 

: İşte benim Mongo kabuğundan çıkıştır. Birisinin bunun neden olabileceği ve bunun doğru bir şekilde nasıl işleneceği hakkında bir fikri varsa, fikirlerinizi paylaşın. Belki bir şeyi özledim ya da bir şey düzgün bir şekilde yapılandırılmamış gibi hissediyorum.

UPDATE: Sorunumu çözdüm. Tutarsız sayımların nedeni, rasgele örnekleme kullanan MongoSamplePartitioner'u saran MongoDefaultPartitioner'du. Dürüst olmak gerekirse, bu benim için oldukça tuhaf bir varsayılan. Şahsen bunun yerine yavaş ama tutarlı bir bölümleyici tercih ederim. Bölümleyici seçenekleri için ayrıntılar resmi configuration options belgelerinde bulunabilir.

GÜNCELLEME: Çözümü bir yanıt olarak kopyalayın.

cevap

6

Sorunumu çözdüm. Tutarsız sayımların nedeni, rasgele örnekleme kullanan MongoSamplePartitioner'u saran MongoDefaultPartitioner'du. Dürüst olmak gerekirse, bu benim için oldukça tuhaf bir varsayılan. Şahsen bunun yerine yavaş ama tutarlı bir bölümleyici tercih ederim. Bölümleyici seçenekleri için ayrıntılar resmi configuration options belgelerinde bulunabilir.

kodu:

val df = spark.read 
    .format("com.mongodb.spark.sql.DefaultSource") 
    .option("uri", "mongodb://127.0.0.1/enron_mail.messages") 
    .option("partitioner", "spark.mongodb.input.partitionerOptions.MongoPaginateBySizePartitioner ") 
    .load() 
+0

'()' Scala zorunlu değildir. – mrsrinivas

+0

@mrsrinivas oh, Üzgünüm, bunu bilmiyordum. Daha önce birkaç scala örneğini kontrol ettim ve hepsi .load(). – artemdevel