2010-04-08 11 views
26

Bir Message Queue öğesinden (Amazon SQS) öğeleri tüketmek için aracıları kullanmanın en iyi yolunu bulmaya çalışıyorum. Şu anda kuyrudan bir öğe kaplayan ve işleyen bir işlevim var (süreç kuyruğu).Kuyruk satırını tüketen Clojure ajanları

Bu öğeleri eşzamanlı olarak işlemek istiyorum, ancak ajanları nasıl kontrol edeceğime dair başımı saramam. Temel olarak, tüm ajanları, kuyruktan çok sayıda eşyaya çekmeden ve bir iş listesi geliştirmeden olabildiğince meşgul etmek istiyorum (bunu birkaç makinede çalışıyorum), böylece öğeler kuyrukta bırakılsın gerçekten ihtiyaç vardır).

Uygulamamı iyileştirme konusunda bana bazı işaretçiler verebilir mi?

(def active-agents (ref 0)) 

(defn process-queue-item [_] 
    (dosync (alter active-agents inc)) 
    ;retrieve item from Message Queue (Amazon SQS) and process 
    (dosync (alter active-agents dec))) 

(defn -main [] 
    (def agents (for [x (range 20)] (agent x))) 

    (loop [loop-count 0] 

    (if (< @active-agents 20) 
     (doseq [agent agents] 
     (if (agent-errors agent) 
      (clear-agent-errors agent)) 
     ;should skip this agent until later if it is still busy processing (not sure how) 
     (send-off agent process-queue-item))) 

    ;(apply await-for (* 10 1000) agents) 
    (Thread/sleep 10000) 
    (logging/info (str "ACTIVE AGENTS " @active-agents)) 
    (if (> 10 loop-count) 
     (do (logging/info (str "done, let's cleanup " count)) 
     (doseq [agent agents] 
     (if (agent-errors agent) 
      (clear-agent-errors agent))) 
     (apply await agents) 
     (shutdown-agents)) 
     (recur (inc count))))) 
+0

siz ileti sırası tedavi etmemin bir yolu var mı Bir sq ve sonra paralelleştirme almak için sadece pmap kullanın? –

+0

@Alex Stoddard: Benim durumumda, process-queue-item aslında ağdaki IO'yu engeller, bu yüzden pmap'in doğru seçim olduğunu düşünmüyorum çünkü sadece makinenin çekirdeği olduğu kadar çok iplik kullandığı için. – erikcw

+0

@erikw: Elbette, ama bu sadece bir pmap uygulama detayıdır (thread = #cores + 2). Parametre sayısı olan bir pmap sürümü yazamaya gerek yok.Pmap kaynağının ilk satırına bakın: (let [n (+ 2 (.. Çalışma zamanı getRuntime availableProcessors)) –

cevap

6

Ne istiyorsunuz, bazı üst sınırlarla görevleri teslim etmeye devam etmenin bir yoludur. Buna basit bir yaklaşım, limiti koordine etmek için bir semafor kullanmaktır. Bunun üzerine

(let [limit (.availableProcessors (Runtime/getRuntime)) 
     ; note: you might choose limit 20 based upon your problem description 
     sem (java.util.concurrent.Semaphore. limit)] 
    (defn submit-future-call 
    "Takes a function of no args and yields a future object that will 
    invoke the function in another thread, and will cache the result and 
    return it on all subsequent calls to deref/@. If the computation has 
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks 
    until the completion of another future, where n is the number of 
    available processors." 
    [#^Callable task] 
    ; take a slot (or block until a slot is free) 
    (.acquire sem) 
    (try 
     ; create a future that will free a slot on completion 
     (future (try (task) (finally (.release sem)))) 
     (catch java.util.concurrent.RejectedExecutionException e 
     ; no task was actually submitted 
     (.release sem) 
     (throw e))))) 

(defmacro submit-future 
    "Takes a body of expressions and yields a future object that will 
    invoke the body in another thread, and will cache the result and 
    return it on all subsequent calls to deref/@. If the computation has 
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks 
    until the completion of another future, where n is the number of 
    available processors." 
    [& body] `(submit-future-call (fn [] [email protected]))) 

#_(example 
    user=> (submit-future (reduce + (range 100000000))) 
    #<[email protected]: :pending> 
    user=> (submit-future (reduce + (range 100000000))) 
    #<[email protected]: :pending> 
    user=> (submit-future (reduce + (range 100000000))) 
    ;; blocks at this point for a 2 processor PC until the previous 
    ;; two futures complete 
    #<[email protected]: :pending> 
    ;; then submits the job 

yerde şimdi sadece görevleri kendileri alınır nasıl koordine etmek gerekir: İşte ben yaklaşım şekli şöyledir. Bunu yapmak için zaten mekanizmalara sahipmişsiniz gibi geliyor. Döngü (submit-future (process-queue-item))

4

Belki seque işlevini kullanabilirsiniz? (doc seque) alıntı:

clojure.core/seque 
([s] [n-or-q s]) 
    Creates a queued seq on another (presumably lazy) seq s. The queued 
    seq will produce a concrete seq in the background, and can get up to 
    n items ahead of the consumer. n-or-q can be an integer n buffer 
    size, or an instance of java.util.concurrent BlockingQueue. Note 
    that reading from a seque can block if the reader gets ahead of the 
    producer. 

Ne aklınızda olan ağ üzerinden kuyruk öğeleri alma tembel dizisidir; Bunu seque içine sarın, bunu bir Ref'ye koyun ve çalışan Ajanlar öğelerinizi bu seque ürününden tüketmeniz gerekir. seque, kodun bakış açısından düzenli bir sekme gibi görünen bir şey döndürür; sıra büyüsü şeffaf bir şekilde gerçekleşir. İçine koyduğunuz dizi parçalanmışsa, o zaman yine de bir parçaya zorlanacağını unutmayın. Ayrıca, ilk aramanın seque'a yapılan ilk çağrının, bir ilk öğe veya iki tanesi elde edilene kadar (ya da durum olarak bir parçaya kadar) engellenmiş gibi gözüktüğünü düşünüyorum; bunun, tembel dizilerin seque'un kendisinden daha iyi olduğunu düşünüyorum).

(hiç test bir gerçekten kabataslak bir,) bir kod kroki: sorabilir böylece Aslında

(defn get-queue-items-seq [] 
    (lazy-seq 
    (cons (get-queue-item) 
     (get-queue-items-seq)))) 

(def task-source (ref (seque (get-queue-items-seq)))) 

(defn do-stuff [] 
    (let [worker (agent nil)] 
    (if-let [result 
      (dosync 
       (when-let [task (first @task-source)] 
       (send worker (fn [_] (do-stuff-with task)))))] 
     (do (await worker) 
      ;; maybe do something with worker's state 
      (do-stuff))))) ;; continue working 

(defn do-lots-of-stuff [] 
    (let [fs (doall (repeatedly 20 #(future (do-stuff))))] 
    fs))) 

muhtemelen sıra maddenin seq daha karmaşık bir yapımcı isterdim Yeni nesneler üretmeyi durdurmak (her şey incelikle kapatılabiliyorsa bir zorunluluktur; görev kaynağı kuruduğunda gelecekler ölecek, daha önce yaptıklarını görmek için future-done? kullanın). Ve bu sadece ilk bakışta görebildiğim bir şey ... Eminim ki burada cilalamak için daha çok şey var. Yine de genel yaklaşımın işe yarayacağını düşünüyorum.

+0

Vadeli işlemlerin gerçekte oluşturulacağı kod taslağının son-ama-bir satırına bir düzeltme ekledim. (Bütün fikir için çok önemli, gerçekten ... :-)) –

+0

Bu kodu anlamaya çalışıyorum. Görev kaynağı neden bir referanstır? Onu hiçbir zaman değiştiremezsin. –

+0

@Siddhartha Reddy: İlk bakışta bu yüzden "* gerçekten * yarım yamalak" kodunu aradığımı söyleyebilirim. ;-) Sanırım, "dosync" içinde "ne zaman-let" işlevinin yararlı olması için bir "(görev-görev değişimini değiştir)" (ya da "sonraki") gerekir. Aslına bakarsan bunu tekrar düşünürüm, merak ediyorum, burada “seque” kullanmak çok iyi bir fikirdir; bana öyle geliyor ki, yerel makinenin çökmesi durumunda kaybolacak olan kuyruğa giren eşyaların sayısını arttırıyor (çünkü "işçiler tarafından talep edilmeden önce eşyalarını çekiyorlar"). Sonra tekrar, bazı senaryolarda iyi bir performans olabilir; Yine de –

23
(let [switch (atom true) ; a switch to stop workers 
     workers (doall 
       (repeatedly 20 ; 20 workers pulling and processing items from SQS 
        #(future (while @switch 
          (retrieve item from Amazon SQS and process)))))] 
    (Thread/sleep 100000) ; arbitrary rule to decide when to stop ;-) 
    (reset! switch false) ; stop ! 
    (doseq [worker workers] @worker)) ; waiting for all workers to be done 
+2

Bu artık 1.4 ile çalışmıyor ('gelecekteki 've' gelecekteki çağrı' 'tekrar tekrar 'gerektiren' IFn' döndürmez. Bir geleceği kolayca bir işlevle doldurabilirsiniz, ancak 'geleceğin' # 'ile birlikte gelmesine rağmen, –

+3

@AlexB iyi yakalama, 1.4 sorun bile değil: # orada olmalıydım. Teşekkürler! – cgrand

0

Hala dil ile bir acemi değilim, ama şu çözüm benim için çalışıyor olarak bu, ne kadar deyimsel emin değil:

(let [number-of-messages-per-time 2 
     await-timeout 1000] 
    (doseq [p-messages (partition number-of-messages-per-time messages)] 
    (let [agents (map agent p-messages)] 
     (doseq [a agents] (send-off a process)) 
     (apply await-for await-timeout agents) 
     (map deref agents))))