2013-07-19 20 views
6

süreçleri çalıştırmak için çoklu işlem yakamazsın ve emin değilim _run() işlevi. Tüm süreçler yaratılmış gibi görünüyor - ama bunu tek bir süreçle yaptığımdan daha hızlı değil. Temel olarak, run_**_normalizers() işlevlerinde gerçekleşmekte olan, bir veritabanındaki bir sıra tablosundan (SQLAlchemy) okuyor, ardından birkaç HTTP isteği yapıyor ve ardından verileri değiştirmek için bir 'boru hattı' çalıştırıyor ve sonra veritabanına geri kaydediyor. İpliklerin “ağır” olduğu ve sıklıkla paralellik için kullanıldığı JVM topraklarından geliyorum - çok işlemcili modülün Python'un GIL'sinin sınırlarını aşacağını düşündüğümden bununla biraz kafam karıştı.Aşağıdaki kod aynı anda çalıştırmak için görünmüyor eşzamanlı

+0

Çok işlemcili modül, iş parçacığı değil süreçleri kullanır. Bu nedenle GIL'den etkilenmez. –

+0

Kodunuzu test ettik ve gerekli teknik Tamam. 'Config' sözlük çok kullanılıyorsa, paylaşılan 'config' hakkında emin değilim, bu teoride yavaş şeyler aşağı olabilir. İşlemcinin senin darboğaz olmasının mümkün değil. –

+0

Sadece iş istasyonumda çalıştırıyorum, 8 çekirdek 16 GB RAM Linux. 1 veya 1, 8 veya 16 ile hiçbir şey değişmez - sistem kaynakları iyidir. –

cevap

3

Çok işlemcili sorunumu giderdim ve aslında iş parçacıklarını değiştirdim. Aslında ne düşündüğünü tam olarak emin değilim - her şeyi yeniden tasarladım, işçileri ve görevleri yaptım ve şimdi olmayan şeyler ve şeyler. İşte ne yaptım temellerini var:

import abc 
from Queue import Empty, Queue 
from threading import Thread 

class AbstractTask(object): 
    """ 
     The base task 
    """ 
    __metaclass__ = abc.ABCMeta 

    @abc.abstractmethod 
    def run_task(self): 
     pass 

class TaskRunner(object): 

    def __init__(self, queue_size, num_threads=1, stop_on_exception=False): 
     super(TaskRunner, self).__init__() 
     self.queue    = Queue(queue_size) 
     self.execute_tasks  = True 
     self.stop_on_exception = stop_on_exception 

     # create a worker 
     def _worker(): 
      while self.execute_tasks: 

       # get a task 
       task = None 
       try: 
        task = self.queue.get(False, 1) 
       except Empty: 
        continue 

       # execute the task 
       failed = True 
       try: 
        task.run_task() 
        failed = False 
       finally: 
        if failed and self.stop_on_exception: 
         print('Stopping due to exception') 
         self.execute_tasks = False 
        self.queue.task_done() 

     # start threads 
     for i in range(0, int(num_threads)): 
      t = Thread(target=_worker) 
      t.daemon = True 
      t.start() 


    def add_task(self, task, block=True, timeout=None): 
     """ 
      Adds a task 
     """ 
     if not self.execute_tasks: 
      raise Exception('TaskRunner is not accepting tasks') 
     self.queue.put(task, block, timeout) 


    def wait_for_tasks(self): 
     """ 
      Waits for tasks to complete 
     """ 
     if not self.execute_tasks: 
      raise Exception('TaskRunner is not accepting tasks') 
     self.queue.join() 

all i do) (wait_for_tasks çağrı ardından TaskRunner oluşturun ve (bunlardan binlerce) için görev ekleyebilir ve olduğunu. Açıkçası yeniden mimaride açıkça yaptığım başka bir sorunu çözdüm. Garip olsa da.

1

hala bir çoklu işlem çözüm arıyorsanız, önce işçi havuzu nasıl kullanılacağını kontrol etmek isteyebilirsiniz, o zaman kendi başınıza NUM_THREADS süreçlerini yönetmek için olmazdı: http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

Ve Yavaşlama sorunu için, config nesnesini _run işlevine argüman olarak geçmeyi denediniz mi? Bunun dahili olarak nasıl bir değişiklik yapacağını/nasıl yapacağını bilmiyorum, ama bir şeyi değiştirebileceğini tahmin ediyorum.