Bir Message Queue öğesinden (Amazon SQS) öğeleri tüketmek için aracıları kullanmanın en iyi yolunu bulmaya çalışıyorum. Şu anda kuyrudan bir öğe kaplayan ve işleyen bir işlevim var (süreç kuyruğu).Kuyruk satırını tüketen Clojure ajanları
Bu öğeleri eşzamanlı olarak işlemek istiyorum, ancak ajanları nasıl kontrol edeceğime dair başımı saramam. Temel olarak, tüm ajanları, kuyruktan çok sayıda eşyaya çekmeden ve bir iş listesi geliştirmeden olabildiğince meşgul etmek istiyorum (bunu birkaç makinede çalışıyorum), böylece öğeler kuyrukta bırakılsın gerçekten ihtiyaç vardır).
Uygulamamı iyileştirme konusunda bana bazı işaretçiler verebilir mi?
(def active-agents (ref 0))
(defn process-queue-item [_]
(dosync (alter active-agents inc))
;retrieve item from Message Queue (Amazon SQS) and process
(dosync (alter active-agents dec)))
(defn -main []
(def agents (for [x (range 20)] (agent x)))
(loop [loop-count 0]
(if (< @active-agents 20)
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent))
;should skip this agent until later if it is still busy processing (not sure how)
(send-off agent process-queue-item)))
;(apply await-for (* 10 1000) agents)
(Thread/sleep 10000)
(logging/info (str "ACTIVE AGENTS " @active-agents))
(if (> 10 loop-count)
(do (logging/info (str "done, let's cleanup " count))
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent)))
(apply await agents)
(shutdown-agents))
(recur (inc count)))))
siz ileti sırası tedavi etmemin bir yolu var mı Bir sq ve sonra paralelleştirme almak için sadece pmap kullanın? –
@Alex Stoddard: Benim durumumda, process-queue-item aslında ağdaki IO'yu engeller, bu yüzden pmap'in doğru seçim olduğunu düşünmüyorum çünkü sadece makinenin çekirdeği olduğu kadar çok iplik kullandığı için. – erikcw
@erikw: Elbette, ama bu sadece bir pmap uygulama detayıdır (thread = #cores + 2). Parametre sayısı olan bir pmap sürümü yazamaya gerek yok.Pmap kaynağının ilk satırına bakın: (let [n (+ 2 (.. Çalışma zamanı getRuntime availableProcessors)) –