2011-03-10 10 views
5

'u kullanarak çok uzun süren görevleri nasıl iptal edebilirim 2 parçacığı FixedThreadPool ExecutorService adlı bir CompletionService kullanılarak tamamlanmış bazı Gelecekteki görevler sunarım, daha sonra verilen görevlerin sayısına eşit bir döngü ayarlayın ve tamamlama hizmetini kullanın. Hepsini() tamamlamaları veya bitirmelerini beklemek. Sorun çok nadiren bitmez (ama nedenini bilmiyorum) bu yüzden take() yöntemini bir ankete dönüştürdüm (300, Timeout.SECONDS), bu fikir bir görevi tamamlamak için 5 dakikadan uzun sürüyorsa anket başarısız olur ve daha sonra döngüden çıkacaktır ve tüm gelecekleri gözden geçirebilirim ve hatalı görevi iptal etmeyi zorlamak için future.cancel'i (doğru) arayabilirim.CompletionService

Ancak kodu çalıştırdığımda ve kilitlendiğinde, yoklamanın her 5 dakikada bir başarısız olduğunu görüyorum ve artık hiçbir görev çalışmıyor, bu yüzden iki işçinin bir şekilde kilitlendiğini ve hiçbir zaman bitmeyeceğini ve hiçbir zaman ek görevlere izin vermediğini varsayıyorum başlamak. Zaman aşımı 5 dakika olduğu için hala 1000 görev vardı ve bu süre sonunda devre dışı bırakmak için alınan sürenin çok uzun olması nedeniyle işi iptal etti.

Yapmak istediğim şey, 5 dakika içinde tamamlanmamışsa geçerli görevi geçici olarak zorla iptal etmekti, ancak bunu yapmanın bir yolunu göremiyorum.

Bu kod örneği Im, sizin Callable kesintiyi destekleyen bir çağrı engelleyen yaklaşık

import com.jthink.jaikoz.exception.JaikozException; 
import java.util.ArrayList; 
import java.util.Collection; 
import java.util.List; 
import java.util.concurrent.*; 

public class CompletionServiceTest 
{ 
    public static void main(final String[] args) 
    { 
     CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(2)); 
     Collection<Worker> tasks = new ArrayList<Worker>(10); 
     tasks.add(new Worker(1)); 
     tasks.add(new Worker(2)); 
     tasks.add(new Worker(3)); 
     tasks.add(new Worker(4)); 
     tasks.add(new Worker(5)); 
     tasks.add(new Worker(6)); 

     List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size()); 
     try 
     { 
      for (Callable task : tasks) 
      { 
       futures.add(cs.submit(task)); 
      } 
      for (int t = 0; t < futures.size(); t++) 
      { 
       Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS); 
       if(result==null) 
       { 
        System.out.println("Worker TimedOut:"); 
        continue; 
       } 
       else 
       { 
        try 
        { 
         if(result.isDone() && result.get()) 
         { 
          System.out.println("Worker Completed:"); 
         } 
         else 
         { 
          System.out.println("Worker Failed"); 
         } 
        } 
        catch (ExecutionException ee) 
        { 
         ee.printStackTrace(); 
        } 
       } 
      } 
     } 
     catch (InterruptedException ie) 
     { 
     } 
     finally 
     { 
      //Cancel by interrupting any existing tasks currently running in Executor Service 
      for (Future<Boolean> f : futures) 
      { 
       f.cancel(true); 
      } 
     } 
     System.out.println("Done"); 
    } 
} 

class Worker implements Callable<Boolean> 
{ 
    private int number; 
    public Worker(int number) 
    { 
     this.number=number; 
    } 

    public Boolean call() 
    { 
     if(number==3) 
     { 
      try 
      { 
       Thread.sleep(50000); 
      } 
      catch(InterruptedException tie) 
      { 

      } 
     } 
     return true; 
    } 
} 

çıkışı da işçi örnekte

Worker Completed: 
Worker Completed: 
Worker Completed: 
Worker Completed: 
Worker Completed: 
Worker TimedOut: 
Done 
+0

@ user294896 - Küçük, kendi kendine yeten bir örnekte bazı örnek kodlar sağlayabilir misiniz? – justkt

+0

@justkt Denebilirim, biraz zaman alabilir –

+0

Neden görev çok uzun sürüyor ve iptal edemiyor? Bu, işleri büyük ölçüde basitleştirebilir. – trojanfoe

cevap

4

Sanırım çözdüm, temelde bir zaman aşımı oluşursa, sonra gelecekteki nesneler listemde yineliyorum ve tamamlanmayan ilkini bulup zorla iptal et. Bu zarif görünmüyor ama işe yarıyor gibi görünüyor.

Yalnızca daha iyi çözümü gösteren ancak 2 dişli havuzla çalışan çıktıyı göstermek için havuzun boyutlarını değiştirdim.

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.Date; 
import java.util.List; 
import java.util.concurrent.*; 

public class CompletionServiceTest 
{ 
    public static void main(final String[] args) 
    { 
     CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(1)); 
     Collection<Worker> tasks = new ArrayList<Worker>(10); 
     tasks.add(new Worker(1)); 
     tasks.add(new Worker(2)); 
     tasks.add(new Worker(3)); 
     tasks.add(new Worker(4)); 
     tasks.add(new Worker(5)); 
     tasks.add(new Worker(6)); 

     List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size()); 
     try 
     { 
      for (Callable task : tasks) 
      { 
       futures.add(cs.submit(task)); 
      } 
      for (int t = 0; t < futures.size(); t++) 
      { 
       System.out.println("Invocation:"+t); 
       Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS); 
       if(result==null) 
       { 
        System.out.println(new Date()+":Worker Timedout:"); 
        //So lets cancel the first futures we find that havent completed 
        for(Future future:futures) 
        { 
         System.out.println("Checking future"); 
         if(future.isDone()) 
         { 
          continue; 
         } 
         else 
         { 
          future.cancel(true); 
          System.out.println("Cancelled"); 
          break; 
         } 
        } 
        continue; 
       } 
       else 
       { 
        try 
        { 
         if(result.isDone() && !result.isCancelled() && result.get()) 
         { 
          System.out.println(new Date()+":Worker Completed:"); 
         } 
         else if(result.isDone() && !result.isCancelled() && !result.get()) 
         { 
          System.out.println(new Date()+":Worker Failed"); 
         } 
        } 
        catch (ExecutionException ee) 
        { 
         ee.printStackTrace(System.out); 
        } 
       } 
      } 
     } 
     catch (InterruptedException ie) 
     { 
     } 
     finally 
     { 
      //Cancel by interrupting any existing tasks currently running in Executor Service 
      for (Future<Boolean> f : futures) 
      { 
       f.cancel(true); 
      } 
     } 
     System.out.println(new Date()+":Done"); 
    } 
} 

class Worker implements Callable<Boolean> 
{ 
    private int number; 
    public Worker(int number) 
    { 
     this.number=number; 
    } 

    public Boolean call() 
     throws InterruptedException 
    { 
     try 
     { 
      if(number==3) 
      { 
       Thread.sleep(50000); 
      } 
     } 
     catch(InterruptedException ie) 
     { 
      System.out.println("Worker Interuppted"); 
      throw ie; 
     } 
     return true; 
    } 
} 

Çıktı Her zaman sonra future.cancel() çağırabilir ... henüz bitiremedi eğer future.get(timeout...)
O zaman aşımı istisna dönecektir çağırabilir

Invocation:0 
Thu Mar 10 20:51:39 GMT 2011:Worker Completed: 
Invocation:1 
Thu Mar 10 20:51:39 GMT 2011:Worker Completed: 
Invocation:2 
Thu Mar 10 20:51:49 GMT 2011:Worker Timedout: 
Checking future 
Checking future 
Checking future 
Cancelled 
Invocation:3 
Worker Interuppted 
Invocation:4 
Thu Mar 10 20:51:49 GMT 2011:Worker Completed: 
Invocation:5 
Thu Mar 10 20:51:49 GMT 2011:Worker Completed: 
Thu Mar 10 20:51:49 GMT 2011:Done 
2

konuşurken ne basitleştirilmiş versiyonunu gösterir. Gerçek kodunuz dahili bir kilit üzerinde kilitleniyorsa (synchronized blok), kesinti ile iptal edemezsiniz. Bunun yerine, kilit edinimi için ne kadar beklemek istediğinizi belirtmenize izin veren açık bir kilit (java.util.concurrent.Lock) kullanabilirsiniz. Bir iş parçacığı bir kilitlenme beklerken zaman aşımına uğrarsa, muhtemelen bir kilitlenme durumuyla karşılaştığı için bir hata iletisi ile iptal edilebilir. Bu örnekte, sizin örneğinizde, Callable, InterruptedException'u yutmamalıdır. Ya geçmeli (rethrow veya metot deklarasyonunun atış çizgisine InterruptedException ekleyiniz) ya da catch bloğunda, iş parçasının kesintili durumunu sıfırlayın (Thread.currentThread().interrupt() ile).

+0

İşçi kodu, gerçekten işe koyduğum gerçek işçiyi temsil etmiyor, böylece kodun çalışması için. Soru, CompletionService/ExecutorService ile ilgili bir problemin, hangi geleceğin çalıştığını/problemi çözdüğünü bilmediğim bir hizmetten nasıl iptal edebilirim. –

+0

ps: Kodunuzdaki stratejik noktalarda Thread.currentThread(). IsInterrupted() öğesini da kontrol etmelisiniz. InterruptedException, bunu destekleyen bir engelleme işleminde olmadığınız sürece genellikle atılmaz. IsInterrupted() yöntemini kullanarak bir kesme işleminin gerçekleşip gerçekleşmediğini kontrol etmek ve görevinizi durdurup temizledikten sonra kontrol etmek size kalmıştır. – Matt

1

olduğunu.