17

Çok parçalı bir python uygulamasına sahibim. Bir iş parçacığında bir asyncio döngüsünü çalıştırmak ve başka bir iş parçacığından, bu makaleye calbacks ve coroutines göndermek istiyorum. Kolay olmalı ama kafamı asyncio malzemenin etrafında alamıyorum. python asyncio, başka bir iş parçacığının görevleri nasıl oluşturulur ve iptal edilir

İstediğimi yarısı, bir şey üzerinde yorum yapmaktan çekinmeyin yapar aşağıdaki çözüme geldi:
import asyncio 
from threading import Thread 

class B(Thread): 
    def __init__(self): 
     Thread.__init__(self) 
     self.loop = None 

    def run(self): 
     self.loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(self.loop) #why do I need that?? 
     self.loop.run_forever() 

    def stop(self): 
     self.loop.call_soon_threadsafe(self.loop.stop) 

    def add_task(self, coro): 
     """this method should return a task object, that I 
      can cancel, not a handle""" 
     f = functools.partial(self.loop.create_task, coro) 
     return self.loop.call_soon_threadsafe(f) 

    def cancel_task(self, xx): 
     #no idea 

@asyncio.coroutine 
def test(): 
    while True: 
     print("running") 
     yield from asyncio.sleep(1) 

b.start() 
time.sleep(1) #need to wait for loop to start 
t = b.add_task(test()) 
time.sleep(10) 
#here the program runs fine but how can I cancel the task? 

b.stop() 

Yani başlayan ve döngü stoping çalışıyor. Create_task kullanarak görev oluşturma hakkında düşündüm, ancak bu yöntem threadafe değil, bu yüzden call_soon_threadsafe içine sardım. Ama görevi iptal edebilmek için görev nesnesini alabilmek istiyorum. Gelecek ve Durum'u kullanarak karmaşık bir şeyler yapabilirim, ama daha basit bir yol olmalı, değil mi?

cevap

13

add_task yönteminizi, olay döngüsünden başka bir iş parçacığından aranıp aranmadığının farkında olmanız gerekebilir. Bu şekilde, aynı iş parçacığından çağrılırsa, doğrudan asyncio.async numaralı telefonu arayabilirsiniz, aksi halde görevi iş parçacığı iş parçacığından iş parçacığına geçirmek için bazı ek işler yapabilirsiniz. Biz run yöntemde olay döngü iplik kimliği kaydetmek, bu nedenle add_task yapılan çağrılar daha sonra diğer iş parçacığı geliyor eğer çözebiliriz

import time 
import asyncio 
import functools 
from threading import Thread, current_thread, Event 
from concurrent.futures import Future 

class B(Thread): 
    def __init__(self, start_event): 
     Thread.__init__(self) 
     self.loop = None 
     self.tid = None 
     self.event = start_event 

    def run(self): 
     self.loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(self.loop) 
     self.tid = current_thread() 
     self.loop.call_soon(self.event.set) 
     self.loop.run_forever() 

    def stop(self): 
     self.loop.call_soon_threadsafe(self.loop.stop) 

    def add_task(self, coro): 
     """this method should return a task object, that I 
      can cancel, not a handle""" 
     def _async_add(func, fut): 
      try: 
       ret = func() 
       fut.set_result(ret) 
      except Exception as e: 
       fut.set_exception(e) 

     f = functools.partial(asyncio.async, coro, loop=self.loop) 
     if current_thread() == self.tid: 
      return f() # We can call directly if we're not going between threads. 
     else: 
      # We're in a non-event loop thread so we use a Future 
      # to get the task from the event loop thread once 
      # it's ready. 
      fut = Future() 
      self.loop.call_soon_threadsafe(_async_add, f, fut) 
      return fut.result() 

    def cancel_task(self, task): 
     self.loop.call_soon_threadsafe(task.cancel) 


@asyncio.coroutine 
def test(): 
    while True: 
     print("running") 
     yield from asyncio.sleep(1) 

event = Event() 
b = B(event) 
b.start() 
event.wait() # Let the loop's thread signal us, rather than sleeping 
t = b.add_task(test()) # This is a real task 
time.sleep(10) 
b.stop() 

İlk: İşte bir örnek. Olay olmayan bir döngü iş parçacığından add_task çağrılırsa, her ikisini de programlayan bir işlevi çağırmak için call_soon_threadsafe kullanırız ve ardından concurrent.futures.Future numarasını kullanarak görevi tekrar iş parçacığına ileterek Future sonucunu bekleriz.

bir görevi iptal ilgili bir not: Eğer bir Task üzerinde cancel çağırdığınızda, bir CancelledError olay döngü çalışır eşyordamın sefere arttırılacaktır. Bu, Görev'in sardığı koroutinin, bir sonraki noktaya bir akma noktası çarptığında istisnadan dolayı durdurulacağı anlamına gelir - koroutin CancelledError'u yakalamadığı ve kendi kendini iptal etmeyi önlediği sürece. Ayrıca, bunun yalnızca sarılmış olan fonksiyonun aslında bir kesilebilir koroutin olması durumunda işe yaradığını unutmayın; Örneğin, BaseEventLoop.run_in_executor tarafından döndürülen bir asyncio.Future, gerçekten iptal edilemez çünkü aslında bir concurrent.futures.Future etrafına sarılmıştır ve temel işlevleri gerçekte çalışmaya başladıktan sonra iptal edilemez. Bu durumlarda, asyncio.Future iptal edildiğini söyleyecektir, ancak aslında yürütücüde çalışan işlev çalışmaya devam edecektir.

Düzenleme: Andrew Svetlov önerisi başına, yerine queue.Queue arasında concurrent.futures.Future kullanan ilk örneğini güncellendi.

Not: asyncio.async, 3.4.4 sürümünden beri asyncio.ensure_future kullanımdan kaldırılmıştır.

+0

Örnek için verdiğim birçok sorunu düzeltmeme yardımcı oldu. Btw Ayrıca Gelecek ile Gelecek (loop = self.loop) anlamak zorunda kaldı, aksi takdirde bazı durumlarda gelecekte yanlış döngü alırsınız –

+0

@OlivierRD 'concurrent.futures.Future',' asyncio.Future' kullanmalısınız. concurrent.futures.Future' bir 'loop' anahtar kelimesi arugment almaz. – dano

+0

belgelerinin yaptığı gibi görünüyor: https://docs.python.org/3/library/asyncio-task.html#asyncio.Future –

6

Her şeyi doğru yaparsınız. asyncio sadece ana iş parçacığı için örtülü olay döngü oluşturur çünkü görev durma yapmak yöntemine

class B(Thread): 
    # ... 
    def cancel(self, task): 
     self.loop.call_soon_threadsafe(task.cancel) 

için BTW kurulum için

self.loop = asyncio.new_event_loop() 
asyncio.set_event_loop(self.loop) 

tarafından açıkça oluşturulan iş parçacığı için bir olay döngü var.

+0

eksik parça ilk etapta 'task' için kolu almak için nasıl. OP, 'add_task' yönteminde' call_soon_threadsafe (self.loop.create_task) 'işlevini kullanması gerektiğinden, aslında döngüye ekledikten sonra görevin bir tanıtıcısı yoktur. – dano

+1

Anladım. Haklısın. @dano BTW cevabınızdaki concurrent.futures.Future yerine Queue kullanabilirsiniz. Sanırım daha temiz. –

+0

Evet, 'Future' kullanmanın bir 'Queue'den daha güzel olduğunu kabul ediyorum. Cevabımı bunu yansıtmak için güncelledim. Teşekkürler! – dano

5

sadece referans için bu sitede aldığım yardım temelinde uygulamış olduğum kod, tüm özelliklere ihtiyaç duymadığımdan daha basittir. Tekrar teşekkürler! sürümü 3.4.4 asyncio yana

class B(Thread): 
    def __init__(self): 
     Thread.__init__(self) 
     self.loop = None 

    def run(self): 
     self.loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(self.loop) 
     self.loop.run_forever() 

    def stop(self): 
     self.loop.call_soon_threadsafe(self.loop.stop) 

    def _add_task(self, future, coro): 
     task = self.loop.create_task(coro) 
     future.set_result(task) 

    def add_task(self, coro): 
     future = Future() 
     p = functools.partial(self._add_task, future, coro) 
     self.loop.call_soon_threadsafe(p) 
     return future.result() #block until result is available 

    def cancel(self, task): 
     self.loop.call_soon_threadsafe(task.cancel) 
2

bir olay döngüsünün bir iş parçacığı bir eşyordam nesneyi göndermek için run_coroutine_threadsafe adlı bir işlev sağlar. Sonucu erişmek veya görevi iptal etmek için concurrent.futures.Future döndürür.

sizin örneği kullanarak: Burada

@asyncio.coroutine 
def test(loop): 
    try: 
     while True: 
      print("Running") 
      yield from asyncio.sleep(1, loop=loop) 
    except asyncio.CancelledError: 
     print("Cancelled") 
     loop.stop() 
     raise 

loop = asyncio.new_event_loop() 
thread = threading.Thread(target=loop.run_forever) 
future = asyncio.run_coroutine_threadsafe(test(loop), loop) 

thread.start() 
time.sleep(5) 
future.cancel() 
thread.join() 
+0

Bir yarış koşulundan veya kilitlenme durumundan kaçınmak için, doğrudan "future.cancel()" yi çağırma. Bunun yerine 'loop.call_soon_threadsafe (future.cancel)' kullanın. Bkz. [Burada] (https://docs.python.org/3.4/library/asyncio-dev.html#concurrency-and-multithreading). – changyuheng

+1

@ ChangYu-heng Bu, [asyncio.Future] (https://docs.python.org/3.4/library/asyncio-task.html#asyncio.Future) vadeleri için geçerlidir, ancak [run_coroutine_threadsafe] (https: // docs.python.org/3.4/library/asyncio-task.html#asyncio.run_coroutine_threadsafe) bir [concurrent.futures.Future] değerini döndürür (https://docs.python.org/3.4/library/concurrent.futures.html# concurrent.futures.Future) hangi thread-safe ve herhangi bir olay döngüsüne bağlı değildir. – Vincent

+0

@Vicent Üzgünüm Orijinal soruyu dikkatlice okumadım. Yani bunun için ek bir yorum şöyle olurdu: 'event.call_soon_threadsafe (future.cancel)' işlevini 'event.cancel()' i çalıştıracağınız iş parçacığı olan thread'dan yürütecekseniz. – changyuheng