2011-04-04 19 views
9

Küçük bir işçiler havuzum var (4) ve çok büyük bir görev listesi (5000 ~). Bir havuz kullanıyorum ve görevleri map_async() ile gönderiyorum. Çalıştığım görev oldukça uzun olduğu için, 1 uzunluğundaki bir parçayı zorluyorum, böylece uzun bir süreç daha kısa olanları tutamaz.Python: Çok işlemcili bekleyen bekleyen görevlerin sayısını nasıl kontrol edebilirim?

Ne yapmak istediğim, kaç tane görevin gönderilmeye alındığını periyodik olarak kontrol etmektir. En fazla 4'ün aktif olacağını biliyorum, kaç tane işlenecek ile ilgileniyorum.

Etrafımda dolaştım ve bunu yapan kimseyi bulamıyorum. Bildiğim

import multiprocessing 
import time 

def mytask(num): 
    print('Started task, sleeping %s' % num) 
    time.sleep(num) 

pool = multiprocessing.Pool(4) 
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1) 
pool.close() 

while True: 
    if not jobs.ready(): 
     print("We're not done yet, %s tasks to go!" % <somethingtogettasks>) 
     jobs.wait(2) 
    else: 
     break 
+0

Bir RHEL-6 sisteminde python2.6 kullanıyorum ancak farklı sürümler/platformlar üzerinde örneklere açık olduğumu belirtmeliyim. Görev tamamlandığında – jkeating

+0

statik değişken azalır mı? (ve görev açıkça başladığında artar). – Enders

+0

Görevler, çalışanlar gelene kadar "başlatılmıyor". Yapılması gereken görevlerin büyüklüğü olan bir global yaratmış olsaydım, o zaman bunu başarabilecek bir görev başlatıldığında her seferinde azaldıysam, ama bu biraz garip ve bazı iplik güvenliği düşüncesi gerektiriyorsa. – jkeating

cevap

6

Jobs._number_left istediğiniz gibi görünüyor. _ geliştiricilerin kaprisinde değişebilen bir iç değer olduğunu belirtir, ancak bu bilgiyi almanın tek yolu gibi görünüyor.

+0

Ah! API dokümanlarında değildi ve ipython'daki işlerde bir dir() yapmayı unutmuştum. Cevap için teşekkürler! – jkeating

1

Hiçbir hava geçirmez şekilde, ancak map_async yerine Pool.imap_unordered() fonksiyonunu kullanırsanız, işlenir unsurları devreye girebilen:

Bazı basit kod

yardımcı olur.

import multiprocessing 
import time 

process_count = 4 

def mytask(num): 
    print('Started task, sleeping %s' % num) 
    time.sleep(num) 
    # Actually, you should return the job you've created here. 
    return num 

pool = multiprocess.Pool(process_count) 
jobs = [] 
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4] 
job_count = 0 
for job in pool.imap_unordered(mytask, items): 
    jobs.append(job) 
    job_count += 1 

    incomplete = len(items) - job_count 
    unsubmitted = max(0, incomplete - process_count) 

    print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted 

pool.close() 

hemen hemen tüm işlemler, iki istisna dışında biri ile işlenmesi olacağını varsayabiliriz çünkü process_count çıkarılarak ediyorum: Daha fazla ürün tüketmek için sol Eğer bir yineleyici kullanırsanız 1), olmayabilir ve süreç ve 2) 4 öğeden daha az kalmış olabilirsiniz. İlk istisna için kodlamadım. Ama gerekiyorsa bunu yapmak oldukça kolay olmalı. Her neyse, örneğiniz bir liste kullanmaktadır, bu yüzden bu soruna sahip olmamanız gerekir.

Düzenleme: Ayrıca, bir süre döngü kullandığınızı fark ettim, bu da düzenli aralıklarla, örneğin, her yarım saniyede bir şey güncellemeye çalıştığınızı gösteriyor. Örnek olarak verdiğim kod bu şekilde yapmayacaktır. Bu bir problem olup olmadığından emin değilim.

+0

Teşekkürler. İmap fonksiyonlarını gerçekten araştırmamıştım (doktorlar biraz ... gerisiydi). Haklısın, işler devam ederken başka şeyler yapmak isterim ve kaç iş kaldığını periyodik olarak rapor ederim. – jkeating

1

Benzer gereksinimlerim var: ilerlemeyi izleme, sonuçlara göre ara iş gerçekleştirme, tüm işlemlerin rasgele bir zamanda temiz bir şekilde durdurulması. Bununla nasıl uğraştığım, apply_async ile görevleri tek tek göndermek. Benim yaptığım bir ağır basitleştirilmiş versiyonu: Bir Queue yerine sonuçları ing return kullanımı

maxProcesses = 4 
q = multiprocessing.Queue() 
pool = multiprocessing.Pool() 
runlist = range(100000) 
sendcounter = 0 
donecounter = 0 
while donecounter < len(runlist): 
    if stopNowBooleanFunc(): # if for whatever reason I want to stop processing early 
     if donecounter == sendcounter: # wait til already sent tasks finish running 
      break 
    else: # don't send new tasks if it's time to stop 
     while sendcounter < len(runlist) and sendcounter - donecounter < maxProcesses: 
      pool.apply_async(mytask, (runlist[sendcounter], q)) 
      sendcounter += 1 

    while not q.empty(): # process completed results as they arrive 
     aresult = q.get() 
     processResults(aresult) 
     donecounter += 1 

Not.