2016-01-25 17 views
13

Ben RxJava son derece hesaplama, paralel hale görevler için, geleneksel ExecutorService bir Scheduler daha hızlı olacağını bir his vardı. RxJava - Zamanlayıcılar vs ExecutorService?

Bu kod

Observable<MyItem> source = ... 

source.flatMap(myItem -> myItem.process().subscribeOn(Schedulers.computation())) 
.subscribe(); 

Ben iş yerinde yapmak tipik bir paralel işlem ile bu iki yaklaşımı karşılaştırıldığında bu

final ExecutorService svc = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1); 
Observable<MyItem> source = ... 

source.flatMap(myItem -> myItem.process().subscribeOn(Schedulers.from(svc))) 
.finallyDo(svc::shutdown) 
.subscribe(); 

daha yavaş aday olacağını bir teori vardı ve ben şu var Sonuçlar.

EXECUTOR 

START: 2016-01-25T09:47:04.350 
END: 2016-01-25T09:48:37.181 
TOTAL TIME (SEC): 92 


COMPUTATION SCHEDULER 

START: 2016-01-25T09:50:37.799 
END: 2016-01-25T09:54:23.674 
TOTAL TIME (SEC): 225 

Yani benim kaba test geleneksel ExecutorService hesaplaması için bir Scheduler çok daha hızlıdır göstermiştir.

Bu sonuçlar için bir sebep var mı? RxJava zamanlayıcıları sadece paralelleştirme için optimize edilmiyor mu? Hesaplama planlayıcılarının Executors'dan daha az ileti kullandığı izlenimini aldım.

+0

Her iki durumda da öğeleri paralel olarak işliyor. Paralel yürütmeyi istiyorsanız, [RxJavaParallel] deney kütüphanesine göz atmalısınız (https://github.com/ReactiveX/RxJavaParallel). –

+0

Yine de, ExecutorService'nin neden Schedulers.computation() 'dan daha hızlı olduğu konusunda hala güzel bir soru. Buna cevap vermeye yetkili değilim. –

+0

Bence bu proje bir sene boyunca uzmandır, çünkü RxJava devleri şu andaki zamanlarına öncelik veremezler. – tmn

cevap

7

Birkaç test yaptım ve kendi ExecutorService'unuzu yaratmanın aslında paralelleştirme performansını artırabildiğini keşfettim. I wrote a blog post on it here.

2

Schedulers.computation()'u kullandığınızda, tüm olaylar aynı iş parçacığında işlenir. Kaynak kodu CachedThreadScheduler.java ve NewThreadWorker.java'a başvurabilirsiniz. Bu uygulamanın yararı, eventB'nin eventB'den sonra yayınlanması durumunda, eventA olayından sonra eventA ele alınacaktır.

Schedulers.from()'u kullandığınızda, olaylar farklı iş parçacıklarında işlenir.