2016-04-06 29 views
0

Benim RDD bir alan başka bir haritadan başka bir haritaya eşleştirmek için bir ihtiyacım var UserDAO.users Burada eşleme anlamaya çalıştım ama username geri dönemezsiniz henüz. Ben bir foreach İşte [email protected]Scala - bir haritada bir harita bir harita başka bir haritaya

yazdırırım zaman güncellenen haritasında bu alıyorum benim kod parçacığı geçerli: - RDD[Map[String, String]]

UserDAO.users - Future[Seq[User]] -

rdd.map { l => { 
     l.map { case (k, v) => { 
     k match { 
      case "a_userid" => { 
      l.updated("a_username", userDAO.users.map(c => c.filter(f => f.userid == v.toInt)).map(y => y.map(e => e.username))) 
      } 
      case _ => 
      } 
      } 
     } 
     } 
    } 

Yani temelde,

rdd nerede Kullanıcı,

numaralı bir kasa sınıfıdır ve güncelleştirilmiş rdd-RDD[Map[String, String]]

-

Herhangi bir fikir bunu çözmek için?

Teşekkür

+0

Lütfen değişkenlerinizin türlerini belirtebilir misiniz? Özellikle, "rdd" ve "userDAO.users". Ayrıca beklenen sonucun türünü de veriniz. – Aivean

+0

@Aivean sorumu güncelledim, teşekkürler –

+0

tamam, sonraki soru, Kullanıcı dizisi ne kadar büyük, 'userDAO.users' tarafından döndürüldü? İşçi düğümlerinde ('rdd.map') daha önce gerçekleştirilmesi yerine onu çağırmak için herhangi bir sebep var mı? – Aivean

cevap

1

Ben o iş yapmak için kodunuzu tekrar yazmış. Lütfen unutmayın, engellemeyi gerektirir, aksi halde beton RDD[Map[String, String]] almak için başka bir yolu yoktur.

Netlik için rdd.map bölümüne başvurdum.

İlk varyant. Kullanıcıları map içindeki kullanım yaklaşımını kullandım. Tüm kullanıcıların, yani 11 milyon kez yineleme başına her zaman okunacak olarak, bu son derece verimsiz olduğunu lütfen unutmayın:

// rdd.map ommitted 
l.get("a_userid").flatMap { 
    userId:String => 
    val newUserName:Option[String] = 
     Await.result(userDAO.users 
     .map(c => c.find(f => f.userid == userId.toInt)) 
     .map(y => y.map(e => e.username)), 
     30 seconds 
    ) 
    newUserName.map(l.updated("a_username", _)) 
}.getOrElse(l) 

Alternatif yaklaşım önceden harita kullanıcıların okumasını gerektirir. Bu harita daha sonra tüm kıvılcım işçilerine yayınlanacak. Haritanız o kadar büyük değil, iyi. Hızlı bir şekilde RDD üzerinden yineleme başına yalnızca tek harita araması yaptığınız için bu yaklaşım daha verimlidir.

val users:Map[Int, String] = Await.result(userDAO.users 
    .map(uss => uss.map(u => u.userid -> u.username).toMap), 
    30 seconds 
) 

// rdd.map ommitted 
l.get("a_userid").flatMap { 
    userId:String => 
    users.get(userId.toInt).map(l.updated("a_username", _)) 
}.getOrElse(l) 

UPD: Sadece şeyiyle uğruna, burada başka bir asenkron türüdür:

userDAO.users 
    .map(uss => uss.map(u => u.userid -> u.username).toMap) 
    .map { users:Map[Int, String] => 
     rdd.map { l:Map[String, String] => 
     l.get("a_userid").flatMap { 
      userId:String => 
      users.get(userId.toInt).map(l.updated("a_username", _)) 
     }.getOrElse(l) 
     } 
    } 

Bu variant2 aynı yaklaşımı takip eder, fakat bunun yerine somut sonucun Future[RDD[Map[String, String]]] döndürür.

+0

Düşüncelerinizi almaktan çok memnun kaldım, teşekkürler. –